计算化学公社

 找回密码 Forget password
 注册 Register
Views: 427|回复 Reply: 1
打印 Print 上一主题 Last thread 下一主题 Next thread

[程序/脚本开发] GPUMD反应性动力学物种统计后处理绘图

[复制链接 Copy URL]

1

帖子

0

威望

199

eV
积分
200

Level 3 能力者

跳转到指定楼层 Go to specific reply
楼主
GPUMD-4.0发布了预训练的NEP89势函数,从文章上看很强大,原文甲烷燃烧reactive MD的case做了一个复现,发现之前没怎么接触过,不知道用什么分析轨迹,按我的理解写了个可以迅速产生物种分布曲线的code,首先是GPUMD的输入文件http://run.in控制gpumd的MD行为
potential    nep89_20250409.txtvelocity     300time_step    0.1ensemble     nvt_lan 300 300 100dump_thermo   10run       5000ensemble     nvt_lan 1500 3000 100dump_thermo   10dump_exyz 10 0 1 1 1run       1000000
可以看出是Langevin thermostat的NVT系综先在室温300K预平衡0.5 ps,然后Langevin thermostat升温到3000K,每1fs保存一帧,共跑100 ps,具体的MD参数没有进行测试和调整,本文只是暂时将全流程跑通
接下来,进入后处理环节,先是轨迹的并行读写,这个性能瓶颈主要在磁盘性能,此处假设内存够用,不考虑分批读入处理,而是并行读入直接处理,本文使用了1800原子的CH4-O2 box,10w帧结构读入需要75s,相比串行加速显著。
import osfrom multiprocessing import Pool, cpu_countimport time # 用于计时from tqdm import tqdmfrom ase.io import read # 取消注释,如果您在脚本顶部没有导入def read_single_xyz_file(filepath):    """    工作函数:读取单个XYZ文件并返回其中的所有Atoms对象列表。    Args:        filepath (str): XYZ文件的完整路径。    Returns:        list: 包含从文件中读取的ASE Atoms对象的列表。              如果文件读取失败,返回空列表。    """    try:        # 确保在工作进程中导入 ASE 的 read 函数,或者它在全局可用        from ase.io import read # 在工作函数内导入,以确保在子进程中可用        # index=":" 读取文件中的所有帧/图像        atoms_list_from_file = read(filepath, index=":")        # print(f"Successfully read {len(atoms_list_from_file)} frames from {filepath}") # 用于调试        return atoms_list_from_file    except FileNotFoundError:        # 在工作进程中打印警告通常不太好,因为输出可能会交错。        # 更好的做法是返回一个特殊值或让主进程处理。        # 但为了简单起见,这里保留打印。        # print(f"警告 (工作进程): 文件未找到 {filepath}")        return [] # 返回空列表表示失败或文件不存在    except Exception as e:        # print(f"错误 (工作进程): 读取文件 {filepath} 时发生错误: {e}")        return []if __name__ == '__main__':    # 1. 生成文件名列表    base_path = "/root/work/packmol/CH4-O2/" # 修改为您的实际路径    filenames_to_read = []    # NEP89论文中的范围是 100 到 999900 (不包括 999990),步长100    # 为了测试,我们可以使用一个较小的范围或确保文件存在    # 示例范围,请根据您的实际文件进行调整    START = 10 # equal dump_step    END = 1000000 # equal MD run steps    dump_step = 10    for i in range(START, END + 1, dump_step): # 原始范围        filename = f"dump.{i}.xyz"        full_filepath = os.path.join(base_path, filename)        # (可选) 检查文件是否存在,以避免向工作进程发送不存在的文件路径        # if os.path.exists(full_filepath):        #     filenames_to_read.append(full_filepath)        # else:        #     print(f"注意: 文件 {full_filepath} 不存在,将跳过。")        filenames_to_read.append(full_filepath) # 假设所有文件都存在,或者让工作函数处理FileNotFoundError    if not filenames_to_read:        print("没有找到需要读取的文件。请检查路径和文件名模式。")        exit()    print(f"准备读取 {len(filenames_to_read)} 个文件...")    # 2. 设置并行处理    num_processes = cpu_count()  # 使用所有可用的CPU核心    # num_processes = max(1, cpu_count() - 2) # 或者保留一些核心给系统    print(f"将使用 {num_processes} 个进程进行并行读取...")    list_of_atoms_lists = []    start_time_parallel = time.time()    # 3. 创建进程池并分发任务    with Pool(processes=num_processes) as pool:        # pool.map 会将 filenames_to_read 列表中的每个文件名传递给 read_single_xyz_file 函数        # 它会阻塞直到所有结果都计算完毕并返回一个列表,其中每个元素是 read_single_xyz_file 的返回值        results_iterator = pool.imap(read_single_xyz_file, filenames_to_read) # 使用 imap 以便更流畅地更新进度条                # 使用tqdm包装迭代器以显示进度        for result in tqdm(results_iterator, total=len(filenames_to_read), desc="并行读取文件", unit="file"):            list_of_atoms_lists.append(result)                end_time_parallel = time.time()    print(f"并行读取完成,耗时: {end_time_parallel - start_time_parallel:.4f} 秒")    # 4. 合并结果    images_parallel = []    for sublist in tqdm(list_of_atoms_lists, desc="合并帧列表", unit="file_list"):        if sublist: # 确保子列表不为空            images_parallel.extend(sublist) # 使用 extend 来合并列表    print(f"总共读取了 {len(images_parallel)} 帧 (并行)。")
然后是基于邻接矩阵连通性的分子物种识别,基于numba/JIT等方案做一些加速
import numpy as npfrom scipy.spatial import cKDTreeimport numbafrom ase.data import covalent_radii# --- 常量和辅助函数 ---_COVALENT_RADII_INCOMPLETE = covalent_radiidef get_atomic_radii(atomic_numbers, covalent_radii_source=_COVALENT_RADII_INCOMPLETE):    """    根据原子序数数组获取共价半径数组。    """    radii = np.zeros(len(atomic_numbers), dtype=float)    max_atomic_number_in_source = len(covalent_radii_source) - 1    for i, num in enumerate(atomic_numbers):        if 0 < num <= max_atomic_number_in_source and covalent_radii_source[num] > 0:            radii = covalent_radii_source[num]        else:            # 对于源中没有的或半径为0的元素,使用一个通用默认值或引发错误            # print(f"Warning: Covalent radius for atomic number {num} not found or is zero. Using default 0.7 Å.")            radii = 0.7  # 一个非常通用的默认值    return radii@numba.jit(nopython=True, fastmath=True)def _get_bonded_pairs_numba(positions_np, atom_radii_np, potential_pairs_indices_np, cutoff_multiplier):    """    Numba JIT 编译函数,用于从潜在的原子对中精确筛选出成键的原子对。    返回一个成键原子对的列表 [(idx1, idx2), ...]。    """    num_potential_pairs = potential_pairs_indices_np.shape[0]    bonded_pairs_list = [] # 在Numba nopython模式下,列表的类型会在第一次append时确定    for k in range(num_potential_pairs):        i = potential_pairs_indices_np[k, 0]        j = potential_pairs_indices_np[k, 1]        # 计算这对原子的特定截断距离的平方        cutoff_sum_radii = (atom_radii_np + atom_radii_np[j]) * cutoff_multiplier        cutoff_sq = cutoff_sum_radii * cutoff_sum_radii        # 计算实际距离的平方 (避免开方)        dist_sq = 0.0        for dim_idx in range(positions_np.shape[1]): # 假设 positions_np 是 (N, 3)            diff = positions_np[i, dim_idx] - positions_np[j, dim_idx]            dist_sq += diff * diff        # 成键判断 (dist_sq > 1e-10 避免自身或几乎重叠的原子被视为键)        if dist_sq < cutoff_sq and dist_sq > 1e-10:            bonded_pairs_list.append((i, j)) # Numba可以处理元组列表    return bonded_pairs_list# --- 主函数 ---def get_fragments_optimized(image_data, cutoff_multiplier=1.15, covalent_radii_source=_COVALENT_RADII_INCOMPLETE):    """    优化后的获取分子碎片的函数。    Args:        image_data: ASE Atoms 对象,或者一个包含以下两个元素的元组:                    (positions_np, atomic_numbers_np)                    positions_np: (N, 3) NumPy 数组,原子坐标                    atomic_numbers_np: (N,) NumPy 数组,原子序数        cutoff_multiplier: 用于 (r_i + r_j) * multiplier 计算截断距离的乘数。        covalent_radii_source: 提供共价半径的NumPy数组,索引为原子序数。    Returns:        list_of_fragments: 一个列表,其中每个元素是另一个列表,                           包含属于同一碎片的原子索引。                           例如: [[0, 1, 2], [3, 4], [5]]    """    if hasattr(image_data, 'get_positions') and hasattr(image_data, 'get_atomic_numbers'): # ASE Atoms like object        positions_np = image_data.get_positions()        atomic_numbers_np = image_data.get_atomic_numbers()        original_ase_object = image_data # 如果需要返回ASE Atoms子集时使用    elif isinstance(image_data, tuple) and len(image_data) == 2 and \         isinstance(image_data[0], np.ndarray) and isinstance(image_data[1], np.ndarray):        positions_np, atomic_numbers_np = image_data        original_ase_object = None    else:        raise ValueError("Input 'image_data' must be an ASE Atoms-like object or a tuple of (positions_np, atomic_numbers_np)")    n_atoms = len(positions_np)    if n_atoms == 0:        return []    if n_atoms == 1:        return [[0]] # 单个原子自成一个碎片    atom_radii_np = get_atomic_radii(atomic_numbers_np, covalent_radii_source)    # 估算cKDTree所需的最大搜索半径    # 一个相对宽松的估算:(体系中最大的两个半径之和) * multiplier    # 至少需要两个原子才能形成键    if n_atoms > 1:        sorted_radii = np.sort(atom_radii_np[atom_radii_np > 0]) # 只考虑有效半径        if len(sorted_radii) >= 2:            max_search_radius = (sorted_radii[-1] + sorted_radii[-2]) * cutoff_multiplier        elif len(sorted_radii) == 1: # 只有一个原子有有效半径 (其他是0或未定义)             max_search_radius = (sorted_radii[-1] + sorted_radii[-1]) * cutoff_multiplier # 估算其自身大小        else: # 所有原子半径都无效            max_search_radius = 0.1 # 几乎不会找到任何对    else: # n_atoms == 1,已在前面处理        max_search_radius = 0.1    # 使用cKDTree查找潜在的近邻对    # 如果max_search_radius过小(例如0),query_pairs可能不工作或返回空    if max_search_radius < 1e-9: # 防止半径过小导致问题        # 所有原子都是孤立的        return [ for i in range(n_atoms)]    kdtree = cKDTree(positions_np)    potential_pairs_indices_np = kdtree.query_pairs(r=max_search_radius, output_type='ndarray')    # 使用Numba加速的函数精确判断连接性    bonded_pairs = _get_bonded_pairs_numba(        positions_np, atom_radii_np, potential_pairs_indices_np, cutoff_multiplier    )    # --- 使用BFS查找连通分量 ---    adj = [[] for _ in range(n_atoms)]    for i, j in bonded_pairs:        adj.append(j)        adj[j].append(i)    visited = np.zeros(n_atoms, dtype=bool)    fragments_indices_list = []    for i in range(n_atoms):        if not visited:            current_fragment_indices = []            q = # 使用列表作为队列            visited = True            head = 0            while head < len(q): # BFS 主循环                u = q[head]                head += 1                current_fragment_indices.append(u)                for v_neighbor in adj:                    if not visited[v_neighbor]:                        visited[v_neighbor] = True                        q.append(v_neighbor)            #fragments_indices_list.append(sorted(current_fragment_indices)) # 排序使输出稳定            fragments_indices_list.append(image_data[current_fragment_indices].get_chemical_formula())    # 如果想返回ASE Atoms对象列表 (如果输入是ASE Atoms对象)    # if original_ase_object is not None:    #     return [original_ase_object[indices] for indices in fragments_indices_list]        return fragments_indices_list
可以看到返回的直接是一个List[Str],list内部是化学式
然后则是多进程的执行上述物种识别,CPU占用率可以吃满,选择joblib库执行多进程,并且用python内置的Counter进行计数,10w帧1800原子的结构总共花费97s
from collections import Counterfrom joblib import Parallel, delayedfrom multiprocessing import Pool, cpu_countdef process_single_image_for_counter(image_data):    """    处理单个image数据,返回其碎片计数的Counter对象。    """    # 调用我们之前优化的 get_fragments_optimized 函数    # 它返回一个列表,其中每个元素是另一个列表,包含属于同一碎片的原子索引。    # 例如: [[0, 1, 2], [3, 4], [0, 1, 2]] (假设我们想统计重复的碎片定义)    list_of_fragment_indices = get_fragments_optimized(image_data)    return Counter(list_of_fragment_indices)if __name__ == '__main__':    num_processes = max(1, cpu_count())    all_frame_counters  = Parallel(n_jobs=num_processes)(        delayed(process_single_image_for_counter)(image) for image in tqdm(images_parallel)    )
最后进行物种的统计和绘图
from collections import Counterimport pandas as pd# 1. 获取所有帧中出现过的所有物质名称(作为DataFrame的列名)all_species_names = set()for frame_counter in all_frame_counters:    all_species_names.update(frame_counter.keys())sorted_species_names = sorted(list(all_species_names)) # 排序以保证列的顺序一致# 2. 构建一个字典列表,每个字典代表一帧的数据#    如果某一帧中某种物质不存在,其数量为0data_for_df = []for frame_idx, frame_counter in enumerate(all_frame_counters):    row_data = {'frame': frame_idx} # 可以用帧号作为索引或一列    for species_name in sorted_species_names:        row_data[species_name] = frame_counter.get(species_name, 0) # get(key, 0)确保不存在的物质数量为0    data_for_df.append(row_data)# 3. 将字典列表转换为Pandas DataFramedf_species_counts = pd.DataFrame(data_for_df)# 4. (可选) 将 'frame' 列设为索引if 'frame' in df_species_counts.columns:    df_species_counts = df_species_counts.set_index('frame')# --- 现在 df_species_counts 就是汇总后的结果 ---print("--- 汇总后每一帧各物质数量 (Pandas DataFrame) ---")print(df_species_counts.head()) # 打印前几行查看# 你可以轻松地进行后续操作,例如:# 打印特定物质随时间的变化if 'H2O' in df_species_counts.columns:    print("\n--- H2O 数量随时间变化 ---")    print(df_species_counts['H2O'])# 绘制主要物质数量变化图 (需要 matplotlib)try:    import matplotlib.pyplot as plt        # 选择一些主要物质进行绘图    main_species_to_plot = ['O2', 'CH4', 'H2O', 'CO2', 'CO']    plot_columns = [col for col in main_species_to_plot if col in df_species_counts.columns]        if plot_columns:        df_species_counts[plot_columns].plot(figsize=(12, 6))        plt.title('主要物质数量随时间帧的变化')        plt.xlabel('时间帧 (Frame Number)')        plt.ylabel('分子数量 (Number of Molecules)')        plt.legend(title='物质')        plt.grid(True)        plt.tight_layout()        plt.show()    else:        print("\n选择用于绘图的主要物质在数据中均未找到。")        except ImportError:    print("\nMatplotlib 未安装,无法绘制图形。请运行 'pip install matplotlib' 安装。")
此处得到的图效果是这样的:
还可以进行更精细的绘图,类似RL训练曲线一样做移动平均+方差置信度填充,绘图没有过多调整,x轴应该从帧数改为模拟时间,wsl2绘图的中文字体问题也有点bug,有空可以再改,至此整个流程就跑通了,得到了一个迅速统计物种分布-出图的方案
import pandas as pdimport numpy as npimport matplotlib.pyplot as pltimport matplotlib.font_manager as fm # 用于中文字体# --- 模拟生成类似您图片中的噪声数据 ---# 假设您已经有了一个名为 df_original_counts 的 DataFrame# df_original_counts 的索引是帧号,列是各种物质的名称# 为了演示,我们创建一些虚拟数据num_frames = 10000 # 与您图片中的X轴范围类似frame_numbers = np.arange(num_frames)df_original_counts = df_species_counts# 确保数据不为负df_original_counts[df_original_counts < 0] = 0# --- 计算移动平均和移动标准差 ---# window_size 决定了平滑的程度。值越大,曲线越平滑。# 您需要根据您的数据特性来调整这个值。window_size = 100  # 尝试不同的值,例如 50, 100, 200, 500std_multiplier = 1 # 标准差的倍数,用于定义方差带的宽度 (例如1代表 ±1个标准差)# 创建新的DataFrame来存储平滑后的数据和标准差df_smoothed_counts = pd.DataFrame(index=df_original_counts.index)df_rolling_std = pd.DataFrame(index=df_original_counts.index)for column in df_original_counts.columns:    # 计算移动平均    # center=True 表示窗口的中心对齐当前数据点,避免相位滞后    # min_periods=1 允许在数据序列的开头/结尾处,即使数据点少于window_size也计算平均值    df_smoothed_counts[column] = df_original_counts[column].rolling(window=window_size, center=True, min_periods=1).mean()        # 计算移动标准差    df_rolling_std[column] = df_original_counts[column].rolling(window=window_size, center=True, min_periods=1).std()# --- 绘图 ---# 尝试查找一个可用的中文字体try:    font_path = fm.findfont(fm.FontProperties(family=['sans-serif']))     if 'uming' in font_path.lower() or 'msyh' in font_path.lower() or 'simhei' in font_path.lower() or 'pingfang' in font_path.lower():         plt.rcParams['font.sans-serif'] = [fm.FontProperties(fname=font_path).get_name()]    else:         plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'WenQuanYi Zen Hei','PingFang SC']    plt.rcParams['axes.unicode_minus'] = Falseexcept Exception:    print("警告:未能成功设置中文字体,标签可能显示为方块。请确保您的系统安装了合适的中文字体。")plt.figure(figsize=(14, 8)) # 设置图形大小# 获取默认颜色循环,以便线条和其方差带颜色一致colors = plt.rcParams['axes.prop_cycle'].by_key()['color']# 绘制平滑后的数据和方差带product = ['CH4', "O2", "CO", "CO2", "H2O"]#for i, column in enumerate(df_smoothed_counts.columns):for i, column in enumerate(product):    mean_line = df_smoothed_counts[column]    std_dev = df_rolling_std[column]        # 获取当前列的颜色    current_color = colors[i % len(colors)]        # 绘制移动平均线    plt.plot(mean_line.index, mean_line, label=f'{column} (移动平均, w={window_size})', linewidth=2, color=current_color)        # 绘制方差带 (mean ± N * std_dev)    # fill_between 用于填充两条水平曲线之间的区域    plt.fill_between(mean_line.index,                      mean_line - std_multiplier * std_dev,                      mean_line + std_multiplier * std_dev,                      color=current_color,                      alpha=0.2, # 设置透明度使带状区域半透明                     label=f'{column} (±{std_multiplier}σ)')plt.xlabel("时间 (Frame Number)", fontsize=14)plt.ylabel("数量 (Number of Molecules)", fontsize=14)plt.title(f"物质数量变化图 (移动平均窗口: {window_size}, 方差带: ±{std_multiplier}σ)", fontsize=16)# 创建图例:由于fill_between也会创建图例项,我们可能需要手动控制或筛选图例handles, labels = plt.gca().get_legend_handles_labels()# 只保留移动平均线的图例项,避免重复显示方差带的图例# (这是一个简单的筛选方法,可能需要根据实际标签调整)unique_labels = {}for handle, label in zip(handles, labels):    if "移动平均" in label and label not in unique_labels : # 只保留包含 "移动平均" 的标签,并去重        unique_labels[label] = handle#plt.legend(unique_labels.values(), unique_labels.keys(), fontsize=10)plt.grid(True, linestyle='--', alpha=0.7)plt.ylim(bottom=0) plt.xlim(left=0, right=num_frames) plt.tight_layout()plt.show()


主要物种统计分布-移动平均图
全部物种统计分布-移动平均图
最后来比较一下论文原图,可以看出趋势类似,但也不完全相同,且原文没有做这种移动平均的平滑,当然这种reactive MD往往需要多条轨迹,同时精细调整参数,暂且不提。

NEP89甲烷燃烧case
详情可参照原文(https://arxiv.org/pdf/2504.21286
最后附上可直接运行的脚本作为参考,似乎市面上也存在大量类似的后处理程序,但总归感觉自己写的这个比较简洁明了,而且性能完全OK,希望可以为大家作为参考,本方案的独特之处大概就是轻量级比较清晰明了的纯Python代码,处理这个10w帧的case也只需要几分钟,DIY和修改相应的IO和绘图也很容易,仅作参考

MDAnalysis.py

18.24 KB, 下载次数 Times of downloads: 7

MDAnalysis.ipynb

353.78 KB, 下载次数 Times of downloads: 5

465

帖子

1

威望

2318

eV
积分
2803

Level 5 (御坂)

2#
发表于 Post on 2025-5-15 09:52:47 | 只看该作者 Only view this author
发帖的时候,用一下“高级模式”吧,里面有代码块的功能。

本版积分规则 Credits rule

手机版 Mobile version|北京科音自然科学研究中心 Beijing Kein Research Center for Natural Sciences|京公网安备 11010502035419号|计算化学公社 — 北京科音旗下高水平计算化学交流论坛 ( 京ICP备14038949号-1 )|网站地图

GMT+8, 2025-8-14 04:13 , Processed in 0.195643 second(s), 24 queries , Gzip On.

快速回复 返回顶部 返回列表 Return to list