基于Python的高效批量处理Splunk Session ID并写入MySQL的解决方案

news2025/5/13 7:53:13

已经用Python实现对Splunk通过session id获取查询数据,现在要实现Python批量数据获取,通过一个列表中的大量Session ID,快速高效地获取一个数据表,考虑异常处理,多线程和异步操作以提高性能,同时将数据表写入MySQL表,获取数据的成功和失败的状态信息写入.log日志文本文件。

这个方案提供了灵活的可扩展性,可以根据具体业务需求调整数据处理逻辑、数据库结构和并发策略。

import logging
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from queue import Queue
import requests
import pymysql
from pymysql import MySQLError
from pymysql.cursors import DictCursor

# 配置日志记录
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('splunk_import.log'),
        logging.StreamHandler()
    ]
)

# Splunk配置
SPLUNK_API_URL = "https://your.splunk.url/services/search/results"
SPLUNK_HEADERS = {
    "Content-Type": "application/json",
    # 其他需要的请求头
}

# MySQL配置
MYSQL_CONFIG = {
    "host": "localhost",
    "user": "root",
    "password": "password",
    "database": "splunk_data",
    "charset": "utf8mb4",
    "cursorclass": DictCursor
}

# 线程安全的MySQL连接池
class MySQLPool:
    _lock = threading.Lock()
    _pool = None
    
    @classmethod
    def get_connection(cls):
        with cls._lock:
            if not cls._pool:
                cls._pool = pymysql.ConnectionPool(
                    min=2,
                    max=10,
                    **MYSQL_CONFIG
                )
            return cls._pool.get_connection()

# Splunk数据获取函数
def fetch_splunk_data(session_id):
    try:
        params = {
            "output_mode": "json",
            "session_id": session_id
        }
        
        response = requests.get(
            SPLUNK_API_URL,
            headers=SPLUNK_HEADERS,
            params=params,
            timeout=30
        )
        response.raise_for_status()
        
        return {
            "session_id": session_id,
            "data": response.json(),
            "success": True
        }
    except Exception as e:
        return {
            "session_id": session_id,
            "error": str(e),
            "success": False
        }

# 数据处理和存储函数
def process_session(session_id):
    result = fetch_splunk_data(session_id)
    
    if not result['success']:
        logging.error(f"Session {session_id} failed: {result.get('error')}")
        return False

    try:
        # 解析数据(根据实际数据结构调整)
        parsed_data = {
            "session_id": session_id,
            "event_count": len(result['data'].get('results', [])),
            "status": "success",
            # 添加其他需要存储的字段
        }
        
        # 写入MySQL
        with MySQLPool.get_connection() as conn:
            with conn.cursor() as cursor:
                sql = """INSERT INTO splunk_results 
                        (session_id, event_count, status)
                        VALUES (%s, %s, %s)"""
                cursor.execute(sql, (
                    parsed_data['session_id'],
                    parsed_data['event_count'],
                    parsed_data['status']
                ))
                conn.commit()
        
        logging.info(f"Session {session_id} processed successfully")
        return True
    
    except MySQLError as e:
        logging.error(f"MySQL error on {session_id}: {str(e)}")
        return False
    except Exception as e:
        logging.error(f"Processing error on {session_id}: {str(e)}")
        return False

# 批量处理控制器
def batch_process(session_ids, max_workers=10):
    success_count = 0
    failure_count = 0
    
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {executor.submit(process_session, sid): sid for sid in session_ids}
        
        for future in as_completed(futures):
            sid = futures[future]
            try:
                if future.result():
                    success_count += 1
                else:
                    failure_count += 1
            except Exception as e:
                logging.error(f"Unexpected error processing {sid}: {str(e)}")
                failure_count += 1
    
    logging.info(f"Processing completed. Success: {success_count}, Failed: {failure_count}")

# 示例使用
if __name__ == "__main__":
    # 从文件读取Session ID列表(示例)
    with open("session_ids.txt") as f:
        session_ids = [line.strip() for line in f if line.strip()]
    
    # 启动批量处理(控制并发数)
    batch_process(session_ids, max_workers=15)

关键功能说明:

  1. 多线程处理:

    • 使用ThreadPoolExecutor管理线程池
    • 可配置最大并发数(max_workers)
    • 实现任务提交和结果跟踪
  2. MySQL连接管理:

    • 实现简单的连接池模式
    • 确保线程安全的数据库连接
    • 自动管理连接获取和释放
  3. 异常处理:

    • 网络请求异常处理
    • 数据库操作异常处理
    • 通用异常捕获和日志记录
  4. 日志记录:

    • 同时输出到文件和终端
    • 包含时间戳和错误详情
    • 线程安全的日志记录
  5. 性能优化:

    • 连接复用减少开销
    • 异步任务处理
    • 批量结果处理

使用说明:

  1. 安装依赖:
pip install requests pymysql
  1. 准备MySQL表:
CREATE TABLE splunk_results (
    id INT AUTO_INCREMENT PRIMARY KEY,
    session_id VARCHAR(255) NOT NULL,
    event_count INT,
    status VARCHAR(50),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
  1. 配置文件:
  • 修改SPLUNK_API_URL和请求头
  • 调整MySQL连接参数
  • 根据实际数据结构修改数据解析逻辑

优化建议:

  1. 速率限制:
# 在fetch_splunk_data中添加延迟
import time
time.sleep(0.1)  # 控制请求频率
  1. 重试机制:
from tenacity import retry, stop_after_attempt, wait_exponential

@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
def fetch_splunk_data(session_id):
    # 原有代码
  1. 批量写入优化:
# 在process_session中改为批量插入
batch_size = 100
batch = []

def save_batch():
    with MySQLPool.get_connection() as conn:
        with conn.cursor() as cursor:
            sql = "INSERT ..."
            cursor.executemany(sql, batch)
            conn.commit()

# 在处理过程中积累数据
batch.append(data)
if len(batch) >= batch_size:
    save_batch()
    batch = []
  1. 异步IO版本:
    考虑使用aiohttp和asyncpg实现完全异步的版本,可进一步提升性能。

监控建议:

  1. 通过日志文件监控处理进度
  2. 添加Prometheus指标监控
  3. 实现进度条显示(使用tqdm库)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2374501.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Android Framework

Android 分区 /boot:存放引导程序,包括内核和内存操作程序。/system:相当于电脑 C 盘,存放 Android 系统及系统应用。/recovery:恢复分区,可以进入该分区进行系统恢复。/data:用户数据区&#…

OpenMCU(六):STM32F103开发板功能介绍

概述 距上一篇关于STM32F103的FreeRTOS博客的发布已经过去很长时间没有更新了。在这段时间内,大家可以看到博主发表了一系列的关于使用qemu 模拟实现STM32F103的博客,博主本来想借助qemu开发stm32F103相关的一些软件功能,博主开发出来并成功运…

Rspack:字节跳动自研 Web 构建工具-基于 Rust打造高性能前端工具链

字节跳动开源了一款采用 Rust 开发的前端模块打包工具:Rspack(读音为 /ɑrspk/)。 据介绍,Rspack 是一个基于 Rust 的高性能构建引擎,具备与 Webpack 生态系统的互操作性,可以被 Webpack 项目低成本集成&a…

高速系统设计实例设计分析

在上几章的内容中,我们从纯粹高速信号的理论分析,到 Cadence 工具的具体使用都做了详细的讲解和介绍。相信读者通过前面章节的学习,已经对高速系统的设计理念及 Cadence 相应的设计流程和工具有了一个基本的认识。但是,对于高速电…

查看购物车

一.查看购物车 查看购物车使用get请求。我们要查看当前用户的购物车,就要获取当前用户的userId字段进行条件查询。因为在用户登录时就已经将userId封装在token中了,因此我们只需要解析token获取userId即可,不需要前端再传入参数了。 Control…

开发工具分享: Web前端编码常用的在线编译器

1.OneCompiler 工具网址:https://onecompiler.com/ OneCompiler支持60多种编程语言,在全球有超过1280万用户,让开发者可以轻易实现代码的编写、运行和共享。 OneCompiler的线上调试功能完全免费,对编程语言的覆盖也很全&#x…

智启未来:新一代云MSP管理服务助力企业实现云成本管理和持续优化

在数字化转型浪潮下,企业纷纷寻求更高效、更经济的运营方式。随着云计算技术的深入应用,云成本优化已成为企业普遍关注的核心议题。 过去,传统云运维服务往往依赖于人力外包,缺乏系统性、规范性的管理,难以有效降低云…

window 显示驱动开发-将虚拟地址映射到内存段(二)

在将虚拟地址映射到段的一部分之前,视频内存管理器调用显示微型端口驱动程序的 DxgkDdiAcquireSwizzlingRange 函数,以便驱动程序可以设置用于访问可能重排的分配位的光圈。 驱动程序既不能将偏移量更改为访问分配的 PCI 光圈,也不能更改分配…

【文心智能体】使用文心一言来给智能体设计一段稳定调用工作流的提示词

🌹欢迎来到《小5讲堂》🌹 🌹这是《文心智能体》系列文章,每篇文章将以博主理解的角度展开讲解。🌹 🌹温馨提示:博主能力有限,理解水平有限,若有不对之处望指正&#xff0…

K8S中构建双架构镜像-从零到成功

背景介绍 公司一个客户的项目使用的全信创的环境,服务器采用arm64的机器,而我们的应用全部是amd64的,于是需要对现在公司流水线进行arm64版本的同步镜像生成。本文介绍从最开始到最终生成双架构的全部过程,以及其中使用的相关配置…

c语言第一个小游戏:贪吃蛇小游戏03

我们为贪吃蛇的节点设置为一个结构体,构成贪吃蛇的身子的话我们使用链表,链表的每一个节点是一个结构体 显示贪吃蛇身子的一个节点 我们这边node就表示一个蛇的身体 就是一小节 输出结果如下 显示贪吃蛇完整身子 效果如下 代码实现 这个hasSnakeNode(…

​​​​​​​大规模预训练范式(Large-scale Pre-training)

大规模预训练指在巨量无标注数据上,通过自监督学习训练大参数量的基础模型,使其具备通用的表征与推理能力。其重要作用如下: 一 跨任务泛化 单一模型可在微调后处理多种NLP(自然语言处理)、CV(计算机视觉…

WPF之高级绑定技术

文章目录 引言多重绑定(MultiBinding)基本概念实现自定义IMultiValueConverterMultiBinding在XAML中的应用示例使用StringFormat简化MultiBinding 优先级绑定(PriorityBinding)基本概念PriorityBinding示例实现PriorityBinding的后…

调出事件查看器界面的4种方法

方法1. 方法2. 方法3. 方法4.

使用vite重构vue-cli的vue3项目

一、修改依赖 首先修改 package.json,修改启动方式与相应依赖 移除vue-cli并下载vite相关依赖,注意一些peerDependency如fast-glob需要手动下载 # 移除 vue-cli 相关依赖 npm remove vue/cli-plugin-babel vue/cli-plugin-eslint vue/cli-plugin-rout…

数据治理域——数据治理体系建设

摘要 本文主要介绍了数据治理系统的建设。数据治理对企业至关重要,其动因包括应对数据爆炸增长、提升内部管理效率、支撑复杂业务需求、加强风险防控与合规管理以及实现数字化转型战略。其核心目的是提升数据质量、统一数据标准、优化数据资产管理、支撑业务发展和…

onGAU:简化的生成式 AI UI界面,一个非常简单的 AI 图像生成器 UI 界面,使用 Dear PyGui 和 Diffusers 构建。

​一、软件介绍 文末提供程序和源码下载 onGAU:简化的生成式 AI UI界面开源程序,一个非常简单的 AI 图像生成器 UI 界面,使用 Dear PyGui 和 Diffusers 构建。 二、Installation 安装 文末下载后解压缩 Run install.py with python to setup…

【第52节】Windows编程必学之从零手写C++调试器下篇(仿ollydbg)

目录 一、引言 二、调试器核心功能设计与实现 三、断点功能 四、高级功能 五、附加功能 六、开发环境与实现概要 七、项目展示及完整代码参考 八、总结 一、引言 在软件开发领域,调试器是开发者不可或缺的工具。它不仅能帮助定位代码中的逻辑错误&#xff0…

uni-app学习笔记五--vue3插值表达式的使用

vue3快速上手导航&#xff1a;简介 | Vue.js 模板语法 插值表达式 最基本的数据绑定形式是文本插值&#xff0c;它使用的是“Mustache”语法 (即双大括号)&#xff1a; <span>Message: {{ msg }}</span> 双大括号标签会被替换为相应组件实例中 msg 属性的值。同…

C++类与对象(二):六个默认构造函数(一)

在学C语言时&#xff0c;实现栈和队列时容易忘记初始化和销毁&#xff0c;就会造成内存泄漏。而在C的类中我们忘记写初始化和销毁函数时&#xff0c;编译器会自动生成构造函数和析构函数&#xff0c;对应的初始化和在对象生命周期结束时清理资源。那是什么是默认构造函数呢&…