GPUMD-4.0发布了预训练的NEP89势函数,从文章上看很强大,原文甲烷燃烧reactive MD的case做了一个复现,发现之前没怎么接触过,不知道用什么分析轨迹,按我的理解写了个可以迅速产生物种分布曲线的code,首先是GPUMD的输入文件http://run.in控制gpumd的MD行为 potential nep89_20250409.txtvelocity 300time_step 0.1ensemble nvt_lan 300 300 100dump_thermo 10run 5000ensemble nvt_lan 1500 3000 100dump_thermo 10dump_exyz 10 0 1 1 1run 1000000
可以看出是Langevin thermostat的NVT系综先在室温300K预平衡0.5 ps,然后Langevin thermostat升温到3000K,每1fs保存一帧,共跑100 ps,具体的MD参数没有进行测试和调整,本文只是暂时将全流程跑通 接下来,进入后处理环节,先是轨迹的并行读写,这个性能瓶颈主要在磁盘性能,此处假设内存够用,不考虑分批读入处理,而是并行读入直接处理,本文使用了1800原子的CH4-O2 box,10w帧结构读入需要75s,相比串行加速显著。 import osfrom multiprocessing import Pool, cpu_countimport time # 用于计时from tqdm import tqdmfrom ase.io import read # 取消注释,如果您在脚本顶部没有导入def read_single_xyz_file(filepath): """ 工作函数:读取单个XYZ文件并返回其中的所有Atoms对象列表。 Args: filepath (str): XYZ文件的完整路径。 Returns: list: 包含从文件中读取的ASE Atoms对象的列表。 如果文件读取失败,返回空列表。 """ try: # 确保在工作进程中导入 ASE 的 read 函数,或者它在全局可用 from ase.io import read # 在工作函数内导入,以确保在子进程中可用 # index=":" 读取文件中的所有帧/图像 atoms_list_from_file = read(filepath, index=":") # print(f"Successfully read {len(atoms_list_from_file)} frames from {filepath}") # 用于调试 return atoms_list_from_file except FileNotFoundError: # 在工作进程中打印警告通常不太好,因为输出可能会交错。 # 更好的做法是返回一个特殊值或让主进程处理。 # 但为了简单起见,这里保留打印。 # print(f"警告 (工作进程): 文件未找到 {filepath}") return [] # 返回空列表表示失败或文件不存在 except Exception as e: # print(f"错误 (工作进程): 读取文件 {filepath} 时发生错误: {e}") return []if __name__ == '__main__': # 1. 生成文件名列表 base_path = "/root/work/packmol/CH4-O2/" # 修改为您的实际路径 filenames_to_read = [] # NEP89论文中的范围是 100 到 999900 (不包括 999990),步长100 # 为了测试,我们可以使用一个较小的范围或确保文件存在 # 示例范围,请根据您的实际文件进行调整 START = 10 # equal dump_step END = 1000000 # equal MD run steps dump_step = 10 for i in range(START, END + 1, dump_step): # 原始范围 filename = f"dump.{i}.xyz" full_filepath = os.path.join(base_path, filename) # (可选) 检查文件是否存在,以避免向工作进程发送不存在的文件路径 # if os.path.exists(full_filepath): # filenames_to_read.append(full_filepath) # else: # print(f"注意: 文件 {full_filepath} 不存在,将跳过。") filenames_to_read.append(full_filepath) # 假设所有文件都存在,或者让工作函数处理FileNotFoundError if not filenames_to_read: print("没有找到需要读取的文件。请检查路径和文件名模式。") exit() print(f"准备读取 {len(filenames_to_read)} 个文件...") # 2. 设置并行处理 num_processes = cpu_count() # 使用所有可用的CPU核心 # num_processes = max(1, cpu_count() - 2) # 或者保留一些核心给系统 print(f"将使用 {num_processes} 个进程进行并行读取...") list_of_atoms_lists = [] start_time_parallel = time.time() # 3. 创建进程池并分发任务 with Pool(processes=num_processes) as pool: # pool.map 会将 filenames_to_read 列表中的每个文件名传递给 read_single_xyz_file 函数 # 它会阻塞直到所有结果都计算完毕并返回一个列表,其中每个元素是 read_single_xyz_file 的返回值 results_iterator = pool.imap(read_single_xyz_file, filenames_to_read) # 使用 imap 以便更流畅地更新进度条 # 使用tqdm包装迭代器以显示进度 for result in tqdm(results_iterator, total=len(filenames_to_read), desc="并行读取文件", unit="file"): list_of_atoms_lists.append(result) end_time_parallel = time.time() print(f"并行读取完成,耗时: {end_time_parallel - start_time_parallel:.4f} 秒") # 4. 合并结果 images_parallel = [] for sublist in tqdm(list_of_atoms_lists, desc="合并帧列表", unit="file_list"): if sublist: # 确保子列表不为空 images_parallel.extend(sublist) # 使用 extend 来合并列表 print(f"总共读取了 {len(images_parallel)} 帧 (并行)。")
然后是基于邻接矩阵连通性的分子物种识别,基于numba/JIT等方案做一些加速 import numpy as npfrom scipy.spatial import cKDTreeimport numbafrom ase.data import covalent_radii# --- 常量和辅助函数 ---_COVALENT_RADII_INCOMPLETE = covalent_radiidef get_atomic_radii(atomic_numbers, covalent_radii_source=_COVALENT_RADII_INCOMPLETE): """ 根据原子序数数组获取共价半径数组。 """ radii = np.zeros(len(atomic_numbers), dtype=float) max_atomic_number_in_source = len(covalent_radii_source) - 1 for i, num in enumerate(atomic_numbers): if 0 < num <= max_atomic_number_in_source and covalent_radii_source[num] > 0: radii = covalent_radii_source[num] else: # 对于源中没有的或半径为0的元素,使用一个通用默认值或引发错误 # print(f"Warning: Covalent radius for atomic number {num} not found or is zero. Using default 0.7 Å.") radii = 0.7 # 一个非常通用的默认值 return radii@numba.jit(nopython=True, fastmath=True)def _get_bonded_pairs_numba(positions_np, atom_radii_np, potential_pairs_indices_np, cutoff_multiplier): """ Numba JIT 编译函数,用于从潜在的原子对中精确筛选出成键的原子对。 返回一个成键原子对的列表 [(idx1, idx2), ...]。 """ num_potential_pairs = potential_pairs_indices_np.shape[0] bonded_pairs_list = [] # 在Numba nopython模式下,列表的类型会在第一次append时确定 for k in range(num_potential_pairs): i = potential_pairs_indices_np[k, 0] j = potential_pairs_indices_np[k, 1] # 计算这对原子的特定截断距离的平方 cutoff_sum_radii = (atom_radii_np + atom_radii_np[j]) * cutoff_multiplier cutoff_sq = cutoff_sum_radii * cutoff_sum_radii # 计算实际距离的平方 (避免开方) dist_sq = 0.0 for dim_idx in range(positions_np.shape[1]): # 假设 positions_np 是 (N, 3) diff = positions_np[i, dim_idx] - positions_np[j, dim_idx] dist_sq += diff * diff # 成键判断 (dist_sq > 1e-10 避免自身或几乎重叠的原子被视为键) if dist_sq < cutoff_sq and dist_sq > 1e-10: bonded_pairs_list.append((i, j)) # Numba可以处理元组列表 return bonded_pairs_list# --- 主函数 ---def get_fragments_optimized(image_data, cutoff_multiplier=1.15, covalent_radii_source=_COVALENT_RADII_INCOMPLETE): """ 优化后的获取分子碎片的函数。 Args: image_data: ASE Atoms 对象,或者一个包含以下两个元素的元组: (positions_np, atomic_numbers_np) positions_np: (N, 3) NumPy 数组,原子坐标 atomic_numbers_np: (N,) NumPy 数组,原子序数 cutoff_multiplier: 用于 (r_i + r_j) * multiplier 计算截断距离的乘数。 covalent_radii_source: 提供共价半径的NumPy数组,索引为原子序数。 Returns: list_of_fragments: 一个列表,其中每个元素是另一个列表, 包含属于同一碎片的原子索引。 例如: [[0, 1, 2], [3, 4], [5]] """ if hasattr(image_data, 'get_positions') and hasattr(image_data, 'get_atomic_numbers'): # ASE Atoms like object positions_np = image_data.get_positions() atomic_numbers_np = image_data.get_atomic_numbers() original_ase_object = image_data # 如果需要返回ASE Atoms子集时使用 elif isinstance(image_data, tuple) and len(image_data) == 2 and \ isinstance(image_data[0], np.ndarray) and isinstance(image_data[1], np.ndarray): positions_np, atomic_numbers_np = image_data original_ase_object = None else: raise ValueError("Input 'image_data' must be an ASE Atoms-like object or a tuple of (positions_np, atomic_numbers_np)") n_atoms = len(positions_np) if n_atoms == 0: return [] if n_atoms == 1: return [[0]] # 单个原子自成一个碎片 atom_radii_np = get_atomic_radii(atomic_numbers_np, covalent_radii_source) # 估算cKDTree所需的最大搜索半径 # 一个相对宽松的估算:(体系中最大的两个半径之和) * multiplier # 至少需要两个原子才能形成键 if n_atoms > 1: sorted_radii = np.sort(atom_radii_np[atom_radii_np > 0]) # 只考虑有效半径 if len(sorted_radii) >= 2: max_search_radius = (sorted_radii[-1] + sorted_radii[-2]) * cutoff_multiplier elif len(sorted_radii) == 1: # 只有一个原子有有效半径 (其他是0或未定义) max_search_radius = (sorted_radii[-1] + sorted_radii[-1]) * cutoff_multiplier # 估算其自身大小 else: # 所有原子半径都无效 max_search_radius = 0.1 # 几乎不会找到任何对 else: # n_atoms == 1,已在前面处理 max_search_radius = 0.1 # 使用cKDTree查找潜在的近邻对 # 如果max_search_radius过小(例如0),query_pairs可能不工作或返回空 if max_search_radius < 1e-9: # 防止半径过小导致问题 # 所有原子都是孤立的 return [ for i in range(n_atoms)] kdtree = cKDTree(positions_np) potential_pairs_indices_np = kdtree.query_pairs(r=max_search_radius, output_type='ndarray') # 使用Numba加速的函数精确判断连接性 bonded_pairs = _get_bonded_pairs_numba( positions_np, atom_radii_np, potential_pairs_indices_np, cutoff_multiplier ) # --- 使用BFS查找连通分量 --- adj = [[] for _ in range(n_atoms)] for i, j in bonded_pairs: adj.append(j) adj[j].append(i) visited = np.zeros(n_atoms, dtype=bool) fragments_indices_list = [] for i in range(n_atoms): if not visited: current_fragment_indices = [] q = # 使用列表作为队列 visited = True head = 0 while head < len(q): # BFS 主循环 u = q[head] head += 1 current_fragment_indices.append(u) for v_neighbor in adj: if not visited[v_neighbor]: visited[v_neighbor] = True q.append(v_neighbor) #fragments_indices_list.append(sorted(current_fragment_indices)) # 排序使输出稳定 fragments_indices_list.append(image_data[current_fragment_indices].get_chemical_formula()) # 如果想返回ASE Atoms对象列表 (如果输入是ASE Atoms对象) # if original_ase_object is not None: # return [original_ase_object[indices] for indices in fragments_indices_list] return fragments_indices_list
可以看到返回的直接是一个List[Str],list内部是化学式 然后则是多进程的执行上述物种识别,CPU占用率可以吃满,选择joblib库执行多进程,并且用python内置的Counter进行计数,10w帧1800原子的结构总共花费97s from collections import Counterfrom joblib import Parallel, delayedfrom multiprocessing import Pool, cpu_countdef process_single_image_for_counter(image_data): """ 处理单个image数据,返回其碎片计数的Counter对象。 """ # 调用我们之前优化的 get_fragments_optimized 函数 # 它返回一个列表,其中每个元素是另一个列表,包含属于同一碎片的原子索引。 # 例如: [[0, 1, 2], [3, 4], [0, 1, 2]] (假设我们想统计重复的碎片定义) list_of_fragment_indices = get_fragments_optimized(image_data) return Counter(list_of_fragment_indices)if __name__ == '__main__': num_processes = max(1, cpu_count()) all_frame_counters = Parallel(n_jobs=num_processes)( delayed(process_single_image_for_counter)(image) for image in tqdm(images_parallel) )
最后进行物种的统计和绘图 from collections import Counterimport pandas as pd# 1. 获取所有帧中出现过的所有物质名称(作为DataFrame的列名)all_species_names = set()for frame_counter in all_frame_counters: all_species_names.update(frame_counter.keys())sorted_species_names = sorted(list(all_species_names)) # 排序以保证列的顺序一致# 2. 构建一个字典列表,每个字典代表一帧的数据# 如果某一帧中某种物质不存在,其数量为0data_for_df = []for frame_idx, frame_counter in enumerate(all_frame_counters): row_data = {'frame': frame_idx} # 可以用帧号作为索引或一列 for species_name in sorted_species_names: row_data[species_name] = frame_counter.get(species_name, 0) # get(key, 0)确保不存在的物质数量为0 data_for_df.append(row_data)# 3. 将字典列表转换为Pandas DataFramedf_species_counts = pd.DataFrame(data_for_df)# 4. (可选) 将 'frame' 列设为索引if 'frame' in df_species_counts.columns: df_species_counts = df_species_counts.set_index('frame')# --- 现在 df_species_counts 就是汇总后的结果 ---print("--- 汇总后每一帧各物质数量 (Pandas DataFrame) ---")print(df_species_counts.head()) # 打印前几行查看# 你可以轻松地进行后续操作,例如:# 打印特定物质随时间的变化if 'H2O' in df_species_counts.columns: print("\n--- H2O 数量随时间变化 ---") print(df_species_counts['H2O'])# 绘制主要物质数量变化图 (需要 matplotlib)try: import matplotlib.pyplot as plt # 选择一些主要物质进行绘图 main_species_to_plot = ['O2', 'CH4', 'H2O', 'CO2', 'CO'] plot_columns = [col for col in main_species_to_plot if col in df_species_counts.columns] if plot_columns: df_species_counts[plot_columns].plot(figsize=(12, 6)) plt.title('主要物质数量随时间帧的变化') plt.xlabel('时间帧 (Frame Number)') plt.ylabel('分子数量 (Number of Molecules)') plt.legend(title='物质') plt.grid(True) plt.tight_layout() plt.show() else: print("\n选择用于绘图的主要物质在数据中均未找到。") except ImportError: print("\nMatplotlib 未安装,无法绘制图形。请运行 'pip install matplotlib' 安装。")
此处得到的图效果是这样的: ![]() 还可以进行更精细的绘图,类似RL训练曲线一样做移动平均+方差置信度填充,绘图没有过多调整,x轴应该从帧数改为模拟时间,wsl2绘图的中文字体问题也有点bug,有空可以再改,至此整个流程就跑通了,得到了一个迅速统计物种分布-出图的方案 import pandas as pdimport numpy as npimport matplotlib.pyplot as pltimport matplotlib.font_manager as fm # 用于中文字体# --- 模拟生成类似您图片中的噪声数据 ---# 假设您已经有了一个名为 df_original_counts 的 DataFrame# df_original_counts 的索引是帧号,列是各种物质的名称# 为了演示,我们创建一些虚拟数据num_frames = 10000 # 与您图片中的X轴范围类似frame_numbers = np.arange(num_frames)df_original_counts = df_species_counts# 确保数据不为负df_original_counts[df_original_counts < 0] = 0# --- 计算移动平均和移动标准差 ---# window_size 决定了平滑的程度。值越大,曲线越平滑。# 您需要根据您的数据特性来调整这个值。window_size = 100 # 尝试不同的值,例如 50, 100, 200, 500std_multiplier = 1 # 标准差的倍数,用于定义方差带的宽度 (例如1代表 ±1个标准差)# 创建新的DataFrame来存储平滑后的数据和标准差df_smoothed_counts = pd.DataFrame(index=df_original_counts.index)df_rolling_std = pd.DataFrame(index=df_original_counts.index)for column in df_original_counts.columns: # 计算移动平均 # center=True 表示窗口的中心对齐当前数据点,避免相位滞后 # min_periods=1 允许在数据序列的开头/结尾处,即使数据点少于window_size也计算平均值 df_smoothed_counts[column] = df_original_counts[column].rolling(window=window_size, center=True, min_periods=1).mean() # 计算移动标准差 df_rolling_std[column] = df_original_counts[column].rolling(window=window_size, center=True, min_periods=1).std()# --- 绘图 ---# 尝试查找一个可用的中文字体try: font_path = fm.findfont(fm.FontProperties(family=['sans-serif'])) if 'uming' in font_path.lower() or 'msyh' in font_path.lower() or 'simhei' in font_path.lower() or 'pingfang' in font_path.lower(): plt.rcParams['font.sans-serif'] = [fm.FontProperties(fname=font_path).get_name()] else: plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'WenQuanYi Zen Hei','PingFang SC'] plt.rcParams['axes.unicode_minus'] = Falseexcept Exception: print("警告:未能成功设置中文字体,标签可能显示为方块。请确保您的系统安装了合适的中文字体。")plt.figure(figsize=(14, 8)) # 设置图形大小# 获取默认颜色循环,以便线条和其方差带颜色一致colors = plt.rcParams['axes.prop_cycle'].by_key()['color']# 绘制平滑后的数据和方差带product = ['CH4', "O2", "CO", "CO2", "H2O"]#for i, column in enumerate(df_smoothed_counts.columns):for i, column in enumerate(product): mean_line = df_smoothed_counts[column] std_dev = df_rolling_std[column] # 获取当前列的颜色 current_color = colors[i % len(colors)] # 绘制移动平均线 plt.plot(mean_line.index, mean_line, label=f'{column} (移动平均, w={window_size})', linewidth=2, color=current_color) # 绘制方差带 (mean ± N * std_dev) # fill_between 用于填充两条水平曲线之间的区域 plt.fill_between(mean_line.index, mean_line - std_multiplier * std_dev, mean_line + std_multiplier * std_dev, color=current_color, alpha=0.2, # 设置透明度使带状区域半透明 label=f'{column} (±{std_multiplier}σ)')plt.xlabel("时间 (Frame Number)", fontsize=14)plt.ylabel("数量 (Number of Molecules)", fontsize=14)plt.title(f"物质数量变化图 (移动平均窗口: {window_size}, 方差带: ±{std_multiplier}σ)", fontsize=16)# 创建图例:由于fill_between也会创建图例项,我们可能需要手动控制或筛选图例handles, labels = plt.gca().get_legend_handles_labels()# 只保留移动平均线的图例项,避免重复显示方差带的图例# (这是一个简单的筛选方法,可能需要根据实际标签调整)unique_labels = {}for handle, label in zip(handles, labels): if "移动平均" in label and label not in unique_labels : # 只保留包含 "移动平均" 的标签,并去重 unique_labels[label] = handle#plt.legend(unique_labels.values(), unique_labels.keys(), fontsize=10)plt.grid(True, linestyle='--', alpha=0.7)plt.ylim(bottom=0) plt.xlim(left=0, right=num_frames) plt.tight_layout()plt.show()
![]()
主要物种统计分布-移动平均图![]()
全部物种统计分布-移动平均图最后来比较一下论文原图,可以看出趋势类似,但也不完全相同,且原文没有做这种移动平均的平滑,当然这种reactive MD往往需要多条轨迹,同时精细调整参数,暂且不提。 ![]()
NEP89甲烷燃烧case详情可参照原文(https://arxiv.org/pdf/2504.21286) 最后附上可直接运行的脚本作为参考,似乎市面上也存在大量类似的后处理程序,但总归感觉自己写的这个比较简洁明了,而且性能完全OK,希望可以为大家作为参考,本方案的独特之处大概就是轻量级比较清晰明了的纯Python代码,处理这个10w帧的case也只需要几分钟,DIY和修改相应的IO和绘图也很容易,仅作参考
|