【OpenSearch】高性能 OpenSearch 数据导入

news2025/6/1 15:13:13

高性能 OpenSearch 数据导入

  • 1.导入依赖库
  • 2.配置参数
  • 3.OpenSearch 客户端初始化
  • 4.创建索引函数
  • 5.数据生成器
  • 6.批量处理函数
  • 7.主导入函数
    • 7.1 函数定义和索引创建
    • 7.2 优化索引设置(导入前)
    • 7.3 初始化变量和打印开始信息
    • 7.4 线程池设置
    • 7.5 主数据生成和导入循环
    • 7.6 批次提交条件判断
    • 7.7 进度显示
    • 7.8 数据量达标检查
    • 7.9 处理剩余批次
    • 7.10 等待所有任务完成
    • 7.11 恢复索引设置(导入后)
    • 7.12 段合并优化
    • 7.13 统计结果输出
  • 8.问题分析
    • 8.1 问题根源分析
    • 8.2 修正后的代码方案
      • 方案1:持续生成文档直到达到目标大小
      • 方案2:预先计算需要的文档数量
    • 8.3 关键修改点说明
    • 8.4 其他可能的优化
  • 9.性能优化关键点
  • 10.使用建议

以下是一个高性能的 Python 脚本,用于向 OpenSearch 实例批量导入约 10GB 的数据。这个脚本使用了最佳实践来优化写入性能。

import json
import gzip
import time
from concurrent.futures import ThreadPoolExecutor
from opensearchpy import OpenSearch, helpers
from faker import Faker
import random
from datetime import datetime, timedelta

# 配置 OpenSearch 连接
OPENSEARCH_HOSTS = [{'host': 'localhost', 'port': 9200}]  # 替换为你的 OpenSearch 实例地址
OPENSEARCH_AUTH = ('admin', 'admin')  # 替换为你的认证信息
INDEX_NAME = 'large_data_index'
BULK_SIZE = 1000  # 每批次文档数量
THREAD_COUNT = 8   # 并发线程数
TARGET_DATA_SIZE = 10 * 1024 * 1024 * 1024  # 10GB 目标数据量

# 初始化 OpenSearch 客户端
client = OpenSearch(
    hosts=OPENSEARCH_HOSTS,
    http_auth=OPENSEARCH_AUTH,
    use_ssl=False,  # 根据你的配置调整
    verify_certs=False,
    timeout=30,
    max_retries=10,
    retry_on_timeout=True
)

# 创建索引(如果不存在)
def create_index():
    if not client.indices.exists(INDEX_NAME):
        index_body = {
            "settings": {
                "index": {
                    "number_of_shards": 5,  # 根据你的集群规模调整
                    "number_of_replicas": 1,
                    "refresh_interval": "30s",  # 导入期间减少刷新频率
                }
            },
            "mappings": {
                "properties": {
                    "user_id": {"type": "keyword"},
                    "name": {"type": "text"},
                    "email": {"type": "keyword"},
                    "age": {"type": "integer"},
                    "address": {"type": "text"},
                    "is_active": {"type": "boolean"},
                    "created_at": {"type": "date"},
                    "transaction_amount": {"type": "double"},
                    "tags": {"type": "keyword"},
                    "description": {"type": "text", "index": False}  # 不索引大文本字段
                }
            }
        }
        client.indices.create(INDEX_NAME, body=index_body)

# 生成模拟数据的生成器
def generate_documents(num_docs):
    fake = Faker()
    start_date = datetime.now() - timedelta(days=365*5)
    
    for _ in range(num_docs):
        created_at = start_date + timedelta(days=random.randint(0, 365*5))
        
        doc = {
            "user_id": fake.uuid4(),
            "name": fake.name(),
            "email": fake.email(),
            "age": random.randint(18, 80),
            "address": fake.address().replace('\n', ', '),
            "is_active": random.choice([True, False]),
            "created_at": created_at.isoformat(),
            "transaction_amount": round(random.uniform(1.0, 10000.0), 2),
            "tags": random.sample(["premium", "standard", "vip", "new", "old", "active", "inactive"], 2),
            "description": fake.text(max_nb_chars=200)
        }
        yield doc

# 批量处理函数
def process_batch(batch):
    actions = [
        {
            "_op_type": "index",
            "_index": INDEX_NAME,
            "_source": doc
        }
        for doc in batch
    ]
    
    try:
        success, failed = helpers.bulk(
            client,
            actions,
            chunk_size=BULK_SIZE,
            request_timeout=60,
            raise_on_error=False
        )
        return len(actions), failed
    except Exception as e:
        print(f"批量写入失败: {str(e)}")
        return 0, len(actions)

# 主导入函数
def import_data():
    create_index()
    
    # 临时调整索引设置以提高写入性能
    client.indices.put_settings(index=INDEX_NAME, body={
        "index": {
            "refresh_interval": "-1",  # 禁用刷新
            "number_of_replicas": "0"   # 禁用副本
        }
    })
    
    print("开始数据导入...")
    start_time = time.time()
    total_docs = 0
    total_bytes = 0
    batch = []
    avg_doc_size = 0
    
    # 使用线程池并行处理
    with ThreadPoolExecutor(max_workers=THREAD_COUNT) as executor:
        futures = []
        
        for doc in generate_documents(10**6):  # 生成足够多的文档
            batch.append(doc)
            
            # 估算文档大小
            if avg_doc_size == 0:
                avg_doc_size = len(json.dumps(doc).encode('utf-8'))
            
            # 当批次达到大小时提交
            if len(batch) >= BULK_SIZE or total_bytes + (len(batch) * avg_doc_size) >= TARGET_DATA_SIZE:
                futures.append(executor.submit(process_batch, batch.copy()))
                total_docs += len(batch)
                total_bytes += len(batch) * avg_doc_size
                batch = []
                
                # 显示进度
                if len(futures) % 10 == 0:
                    processed = sum(f.result()[0] for f in futures if f.done())
                    print(f"进度: {processed:,} 文档, {total_bytes/(1024**3):.2f} GB")
                
                if total_bytes >= TARGET_DATA_SIZE:
                    break
        
        # 处理剩余批次
        if batch:
            futures.append(executor.submit(process_batch, batch))
            total_docs += len(batch)
            total_bytes += len(batch) * avg_doc_size
        
        # 等待所有任务完成
        for future in futures:
            processed, failed = future.result()
            if failed > 0:
                print(f"警告: {failed} 文档写入失败")
    
    # 恢复索引设置
    client.indices.put_settings(index=INDEX_NAME, body={
        "index": {
            "refresh_interval": "30s",
            "number_of_replicas": "1"
        }
    })
    
    # 强制合并段以优化存储
    client.indices.forcemerge(index=INDEX_NAME, max_num_segments=5)
    
    end_time = time.time()
    duration = end_time - start_time
    print(f"\n导入完成!")
    print(f"总文档数: {total_docs:,}")
    print(f"总数据量: {total_bytes/(1024**3):.2f} GB")
    print(f"耗时: {duration:.2f} 秒")
    print(f"写入速度: {total_docs/duration:,.0f} 文档/秒")
    print(f"吞吐量: {total_bytes/(1024**2)/duration:.2f} MB/秒")

if __name__ == "__main__":
    import_data()

1.导入依赖库

pip install opensearch-py faker
import json
import gzip
import time
from concurrent.futures import ThreadPoolExecutor
from opensearchpy import OpenSearch, helpers
from faker import Faker
import random
from datetime import datetime, timedelta
  • json:用于处理 JSON 格式数据。
  • gzip:可选,用于压缩数据(虽然脚本中未实际使用)。
  • time:计算导入耗时。
  • ThreadPoolExecutor:实现多线程并发处理。
  • OpenSearchhelpers:OpenSearch Python 客户端库。
  • Faker:生成模拟测试数据。
  • randomdatetime:生成随机数据和日期。

2.配置参数

OPENSEARCH_HOSTS = [{'host': 'localhost', 'port': 9200}]
OPENSEARCH_AUTH = ('admin', 'xxxxx')
INDEX_NAME = 'large_data_index'
BULK_SIZE = 1000
THREAD_COUNT = 8
TARGET_DATA_SIZE = 10 * 1024 * 1024 * 1024
  • 连接参数:主机、端口、认证信息。
  • 目标索引名称。
  • 批量大小:每次批量写入的文档数。
  • 线程数:并发处理的工作线程数。
  • 目标数据量:10GB(用于控制导入总量)。

3.OpenSearch 客户端初始化

client = OpenSearch(
    hosts=OPENSEARCH_HOSTS,
    http_auth=OPENSEARCH_AUTH,
    use_ssl=False,
    verify_certs=False,
    timeout=30,
    max_retries=10,
    retry_on_timeout=True
)

配置了客户端连接参数,包括:

  • 禁用 SSL(仅用于测试环境)。
  • 30 秒超时。
  • 最大重试次数。
  • 超时后自动重试。

4.创建索引函数

def create_index():
    if not client.indices.exists(INDEX_NAME):
        index_body = {
            "settings": {
                "index": {
                    "number_of_shards": 5,  # 根据你的集群规模调整
                    "number_of_replicas": 1,
                    "refresh_interval": "30s",  # 导入期间减少刷新频率
                }
            },
            "mappings": {
                "properties": {
                    "user_id": {"type": "keyword"},
                    "name": {"type": "text"},
                    "email": {"type": "keyword"},
                    "age": {"type": "integer"},
                    "address": {"type": "text"},
                    "is_active": {"type": "boolean"},
                    "created_at": {"type": "date"},
                    "transaction_amount": {"type": "double"},
                    "tags": {"type": "keyword"},
                    "description": {"type": "text", "index": False}  # 不索引大文本字段
                }
            }
        }
        client.indices.create(INDEX_NAME, body=index_body)
  • 检查索引是否存在,不存在则创建。
  • 配置了分片数、副本数和刷新间隔。
  • 定义了字段映射(数据类型等)。

5.数据生成器

def generate_documents(num_docs):
    fake = Faker()
    start_date = datetime.now() - timedelta(days=365*5)
    
    for _ in range(num_docs):
        created_at = start_date + timedelta(days=random.randint(0, 365*5))
        
        doc = {
            "user_id": fake.uuid4(),
            "name": fake.name(),
            "email": fake.email(),
            "age": random.randint(18, 80),
            "address": fake.address().replace('\n', ', '),
            "is_active": random.choice([True, False]),
            "created_at": created_at.isoformat(),
            "transaction_amount": round(random.uniform(1.0, 10000.0), 2),
            "tags": random.sample(["premium", "standard", "vip", "new", "old", "active", "inactive"], 2),
            "description": fake.text(max_nb_chars=200)
        }
        yield doc
  • 使用 Faker 生成真实模拟数据。
  • 生成器模式按需产生文档,节省内存。
  • 创建随机日期、ID、文本等数据。

6.批量处理函数

def process_batch(batch):
    actions = [
        {
            "_op_type": "index",
            "_index": INDEX_NAME,
            "_source": doc
        }
        for doc in batch
    ]
    
    try:
        success, failed = helpers.bulk(
            client,
            actions,
            chunk_size=BULK_SIZE,
            request_timeout=60,
            raise_on_error=False
        )
        return len(actions), failed
    except Exception as e:
        print(f"批量写入失败: {str(e)}")
        return 0, len(actions)
  • 将文档列表转换为批量操作格式。
  • 使用 helpers.bulk 高效执行批量操作。
  • 处理可能的错误,并返回成功/失败计数。

7.主导入函数

def import_data():
    create_index()
    
    # 临时调整索引设置以提高写入性能
    client.indices.put_settings(index=INDEX_NAME, body={
        "index": {
            "refresh_interval": "-1",  # 禁用刷新
            "number_of_replicas": "0"   # 禁用副本
        }
    })
    
    print("开始数据导入...")
    start_time = time.time()
    total_docs = 0
    total_bytes = 0
    batch = []
    avg_doc_size = 0
    
    # 使用线程池并行处理
    with ThreadPoolExecutor(max_workers=THREAD_COUNT) as executor:
        futures = []
        
        for doc in generate_documents(10**6):  # 生成足够多的文档
            batch.append(doc)
            
            # 估算文档大小
            if avg_doc_size == 0:
                avg_doc_size = len(json.dumps(doc).encode('utf-8'))
            
            # 当批次达到大小时提交
            if len(batch) >= BULK_SIZE or total_bytes + (len(batch) * avg_doc_size) >= TARGET_DATA_SIZE:
                futures.append(executor.submit(process_batch, batch.copy()))
                total_docs += len(batch)
                total_bytes += len(batch) * avg_doc_size
                batch = []
                
                # 显示进度
                if len(futures) % 10 == 0:
                    processed = sum(f.result()[0] for f in futures if f.done())
                    print(f"进度: {processed:,} 文档, {total_bytes/(1024**3):.2f} GB")
                
                if total_bytes >= TARGET_DATA_SIZE:
                    break
        
        # 处理剩余批次
        if batch:
            futures.append(executor.submit(process_batch, batch))
            total_docs += len(batch)
            total_bytes += len(batch) * avg_doc_size
        
        # 等待所有任务完成
        for future in futures:
            processed, failed = future.result()
            if failed > 0:
                print(f"警告: {failed} 文档写入失败")
    
    # 恢复索引设置
    client.indices.put_settings(index=INDEX_NAME, body={
        "index": {
            "refresh_interval": "30s",
            "number_of_replicas": "1"
        }
    })
    
    # 强制合并段以优化存储
    client.indices.forcemerge(index=INDEX_NAME, max_num_segments=5)
    
    end_time = time.time()
    duration = end_time - start_time
    print(f"\n导入完成!")
    print(f"总文档数: {total_docs:,}")
    print(f"总数据量: {total_bytes/(1024**3):.2f} GB")
    print(f"耗时: {duration:.2f} 秒")
    print(f"写入速度: {total_docs/duration:,.0f} 文档/秒")
    print(f"吞吐量: {total_bytes/(1024**2)/duration:.2f} MB/秒")

7.1 函数定义和索引创建

def import_data():
    create_index()
  • 定义主导入函数 import_data()
  • 首先调用 create_index() 函数确保目标索引存在。

7.2 优化索引设置(导入前)

    # 临时调整索引设置以提高写入性能
    client.indices.put_settings(index=INDEX_NAME, body={
        "index": {
            "refresh_interval": "-1",  # 禁用刷新
            "number_of_replicas": "0"   # 禁用副本
        }
    })
  • 使用 put_settings 临时修改索引配置。
  • refresh_interval="-1":禁用自动刷新,减少写入开销。
  • number_of_replicas="0":禁用副本,提高写入速度。

7.3 初始化变量和打印开始信息

    print("开始数据导入...")
    start_time = time.time()
    total_docs = 0
    total_bytes = 0
    batch = []
    avg_doc_size = 0
  • 打印开始信息。
  • 记录开始时间用于计算总耗时。
  • 初始化计数器:总文档数、总字节数。
  • 初始化批量列表和平均文档大小变量。

7.4 线程池设置

    # 使用线程池并行处理
    with ThreadPoolExecutor(max_workers=THREAD_COUNT) as executor:
        futures = []
  • 创建线程池,最大线程数为 THREAD_COUNT(之前定义为 8)。
  • 初始化 futures 列表用于跟踪异步任务。

7.5 主数据生成和导入循环

        for doc in generate_documents(10**6):  # 生成足够多的文档
            batch.append(doc)
            
            # 估算文档大小
            if avg_doc_size == 0:
                avg_doc_size = len(json.dumps(doc).encode('utf-8'))
  • 遍历数据生成器产生的文档。
  • 将文档添加到当前批次。
  • 首次循环时计算单个文档的平均大小(字节数)。

7.6 批次提交条件判断

            # 当批次达到大小时提交
            if len(batch) >= BULK_SIZE or total_bytes + (len(batch) * avg_doc_size) >= TARGET_DATA_SIZE:
                futures.append(executor.submit(process_batch, batch.copy()))
                total_docs += len(batch)
                total_bytes += len(batch) * avg_doc_size
                batch = []
  • 当批次达到 BULK_SIZE 1000 1000 1000)或总数据量接近目标时:
    • 提交批次到线程池处理。
    • 更新总文档数和总字节数。
    • 清空当前批次。

7.7 进度显示

                # 显示进度
                if len(futures) % 10 == 0:
                    processed = sum(f.result()[0] for f in futures if f.done())
                    print(f"进度: {processed:,} 文档, {total_bytes/(1024**3):.2f} GB")
  • 每完成 10 个批次显示一次进度。
  • 计算已处理的文档数和数据量(GB)。

7.8 数据量达标检查

                if total_bytes >= TARGET_DATA_SIZE:
                    break
  • 如果总数据量达到目标(10GB),退出循环。

7.9 处理剩余批次

        # 处理剩余批次
        if batch:
            futures.append(executor.submit(process_batch, batch))
            total_docs += len(batch)
            total_bytes += len(batch) * avg_doc_size
  • 循环结束后处理最后未满的批次。

7.10 等待所有任务完成

        # 等待所有任务完成
        for future in futures:
            processed, failed = future.result()
            if failed > 0:
                print(f"警告: {failed} 文档写入失败")
  • 等待所有异步任务完成。
  • 检查并报告失败的文档。

7.11 恢复索引设置(导入后)

    # 恢复索引设置
    client.indices.put_settings(index=INDEX_NAME, body={
        "index": {
            "refresh_interval": "30s",
            "number_of_replicas": "1"
        }
    })
  • 恢复正常的刷新间隔(30秒)。
  • 重新启用副本(1个)。

7.12 段合并优化

    # 强制合并段以优化存储
    client.indices.forcemerge(index=INDEX_NAME, max_num_segments=5)
  • 强制合并 Lucene 段文件,减少段数量(至多 5 个)。
  • 提高后续查询性能。

7.13 统计结果输出

    end_time = time.time()
    duration = end_time - start_time
    print(f"\n导入完成!")
    print(f"总文档数: {total_docs:,}")
    print(f"总数据量: {total_bytes/(1024**3):.2f} GB")
    print(f"耗时: {duration:.2f} 秒")
    print(f"写入速度: {total_docs/duration:,.0f} 文档/秒")
    print(f"吞吐量: {total_bytes/(1024**2)/duration:.2f} MB/秒")

这个函数精心设计了数据流和控制逻辑,确保在保持高性能的同时,也具有良好的可靠性和可观测性。

8.问题分析

如果实践了的同学会发现,程序可能在未达到 10GB 数据量前就停止了,原因在于数据生成逻辑的设计问题。让我们详细分析原因并提供修复方案:

8.1 问题根源分析

在当前的 generate_documents() 函数中:

def generate_documents(num_docs):  # num_docs=10**6
    for _ in range(num_docs):  # 只生成100万文档
        yield doc
  • 固定文档数量限制:函数只生成固定数量(100万)的文档,而不是持续生成直到达到 10GB。
  • 文档大小估算不准确:脚本使用首个文档大小估算所有文档大小,但实际文档可能有大小差异。
  • 循环终止条件:主循环会在生成器耗尽或达到 10GB 时停止,但生成器先耗尽了。

8.2 修正后的代码方案

方案1:持续生成文档直到达到目标大小

def import_data():
    # ...前面的初始化代码不变...
    
    with ThreadPoolExecutor(max_workers=THREAD_COUNT) as executor:
        futures = []
        
        # 修改为无限循环,通过break条件退出
        for doc in generate_documents(float('inf')):  # 无限生成文档
            batch.append(doc)
            
            # 实时计算当前文档大小,而不是估算
            doc_size = len(json.dumps(doc).encode('utf-8'))
            total_bytes += doc_size
            
            if len(batch) >= BULK_SIZE or total_bytes >= TARGET_DATA_SIZE:
                futures.append(executor.submit(process_batch, batch.copy()))
                total_docs += len(batch)
                batch = []
                
                # 显示进度(使用实际数据量)
                if len(futures) % 10 == 0:
                    processed = sum(f.result()[0] for f in futures if f.done())
                    print(f"进度: {processed:,} 文档, {total_bytes/(1024**3):.2f} GB")
                
                if total_bytes >= TARGET_DATA_SIZE:
                    break  # 达到目标数据量才退出
    
    # ...后续恢复设置和统计代码不变...

方案2:预先计算需要的文档数量

def calculate_required_docs():
    # 生成样本文档计算平均大小
    sample_doc = next(generate_documents(1))
    avg_size = len(json.dumps(sample_doc).encode('utf-8'))
    return int(TARGET_DATA_SIZE / avg_size)

def import_data():
    required_docs = calculate_required_docs()
    
    with ThreadPoolExecutor(max_workers=THREAD_COUNT) as executor:
        # 修改为使用精确计算的文档数量
        for doc in generate_documents(required_docs):
            # ...其余处理逻辑不变...

8.3 关键修改点说明

  • 无限生成器generate_documents(float('inf')) 会持续生成文档直到外部中断。
  • 精确大小计算:实时计算每个文档的大小,而不是使用首个文档估算。
  • 严格大小检查:只有达到目标数据量才会退出循环。
  • 内存安全:仍然保持批处理模式,避免内存爆炸。

8.4 其他可能的优化

  1. 动态调整批量大小:根据文档实际大小动态调整 BULK_SIZE

    if avg_doc_size > 1024:  # 如果文档较大
        BULK_SIZE = 500      # 减少批量大小
    else:
        BULK_SIZE = 1000
    
  2. 更精确的进度显示

    print(f"进度: {total_docs:,} 文档, {total_bytes/(1024**3):.2f}/{TARGET_DATA_SIZE/(1024**3):.2f} GB")
    
  3. 提前终止检查:在长时间运行中增加键盘中断检查。

    try:
        # 主循环代码
    except KeyboardInterrupt:
        print("\n用户中断,正在保存进度...")
    

选择哪种方案取决于你的具体需求:

  • 方案 1 更适合精确控制总数据量。
  • 方案 2 更适合预先知道需要多少文档的场景。

两种方案都能确保生成足够 10GB 的数据后才停止程序。

9.性能优化关键点

  • 批量处理:减少网络往返次数。
  • 多线程:充分利用 CPU 和网络资源。
  • 临时禁用刷新和副本:减少写入开销。
  • 生成器模式:避免内存爆炸。
  • 合理的超时和重试:提高稳定性。
  • 后期优化:强制合并段提高查询性能。

10.使用建议

  • 根据集群规模调整分片数。
  • 根据网络和硬件调整批量大小和线程数。
  • 生产环境应启用 SSL 和正确证书。
  • 大数据量导入建议分多次进行。

这个脚本设计用于高效导入大量数据到 OpenSearch,同时保持代码清晰和可维护性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2392759.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

AWS EC2 实例告警的创建与删除

在AWS云环境中,监控EC2实例的运行状态至关重要。通过CloudWatch告警,用户可以实时感知实例的CPU、网络、磁盘等关键指标异常。本文将详细介绍如何通过AWS控制台创建EC2实例告警,以及如何安全删除不再需要的告警规则,并附操作截图与…

STM32 搭配 嵌入式SD卡在智能皮电手环中的应用全景评测

在智能皮电手环及数据存储技术不断迭代的当下,主控 MCU STM32H750 与存储 SD NAND MKDV4GIL-AST 的强强联合,正引领行业进入全新发展阶段。二者凭借低功耗、高速读写与卓越稳定性的深度融合,以及高容量低成本的突出优势,成为大规模…

黑马点评项目01——短信登录以及登录校验的细节

1.短信登录 1.1 Session方式实现 前端点击发送验证码,后端生成验证码后,向session中存放键值对,键是"code",值是验证码;然后,后端生成sessionID以Cookie的方式发给前端,前端拿到后&a…

【笔记】Windows 系统安装 Scoop 包管理工具

#工作记录 一、问题背景 在进行开源项目 Suna 部署过程中,执行设置向导时遭遇报错:❌ Supabase CLI is not installed. 根据资料检索,需通过 Windows 包管理工具Scoop安装 Supabase CLI。 初始尝试以管理员身份运行 PowerShell 安装 Scoop…

MySQL之约束和表的增删查改

MySQL之约束和表的增删查改 一.数据库约束1.1数据库约束的概念1.2NOT NULL 非空约束1.3DEFAULT 默认约束1.4唯一约束1.5主键约束和自增约束1.6自增约束1.7外键约束1.8CHECK约束 二.表的增删查改2.1Create创建2.2Retrieve读取2.3Update更新2.4Delete删除和Truncate截断 一.数据库…

Oracle数据库性能优化的最佳实践

原创:厦门微思网络 以下是 Oracle 数据库性能优化的最佳实践,涵盖设计、SQL 优化、索引管理、系统配置等关键维度,帮助提升数据库响应速度和稳定性: 一、SQL 语句优化 1. 避免全表扫描(Full Table Scan)…

汽配快车道:助力汽车零部件行业的产业重构与数字化出海

汽配快车道:助力汽车零部件行业的数字化升级与出海解决方案。 在当今快速发展的汽车零部件市场中,随着消费者对汽车性能、安全和舒适性的要求不断提高,汽车刹车助力系统作为汽车安全的关键部件之一,其市场需求也在持续增长。汽车…

Windows 11 家庭版 安装Docker教程

Windows 家庭版需要通过脚本手动安装 Hyper-V 一、前置检查 1、查看系统 快捷键【winR】,输入“control” 【控制面板】—>【系统和安全】—>【系统】 2、确认虚拟化 【任务管理器】—【性能】 二、安装Hyper-V 1、创建并运行安装脚本 在桌面新建一个 .…

PyQt6基础_QtCharts绘制横向柱状图

前置: pip install PyQt6-Charts 结果: 代码: import sysfrom PyQt6.QtCharts import (QBarCategoryAxis, QBarSet, QChart,QChartView, QValueAxis,QHorizontalBarSeries) from PyQt6.QtCore import Qt,QSize from PyQt6.QtGui import QP…

《TCP/IP 详解 卷1:协议》第2章:Internet 地址结构

基本的IP地址结构 分类寻址 早期Internet采用分类地址(Classful Addressing),将IPv4地址划分为五类: A类和B类网络号通常浪费太多主机号,而C类网络号不能为很多站点提供足够的主机号。 子网寻址 子网(Su…

如何通过一次需求评审,让项目效率提升50%?

想象一下,你的团队启动了一个新项目,但需求模糊不清,开发到一半才发现方向错了,返工、加班、客户投诉接踵而至……听起来像噩梦?一次完美的需求评审就能避免这一切!它就像项目的“导航仪”,确保…

再见Notepad++,你好Notepad--

Notepad-- 是一款国产开源的轻量级、跨平台文本编辑器,支持 Window、Linux、macOS 以及国产 UOS、麒麟等操作系统。 除了具有常用编辑器的功能之外,Notepad-- 还内置了专业级的代码对比功能,支持文件、文件夹、二进制文件的比对,支…

element-plus bug整理

1.el-table嵌入el-image标签预览时&#xff0c;显示错乱 解决&#xff1a;添加preview-teleported属性 <el-table-column label"等级图标" align"center" prop"icon" min-width"80"><template #default"scope"&g…

技术-工程-管用养修保-智能硬件-智能软件五维黄金序位模型

融智学工程技术体系&#xff1a;五维协同架构 基于邹晓辉教授的框架&#xff0c;工程技术体系重构为&#xff1a;技术-工程-管用养修保-智能硬件-智能软件五维黄金序位模型&#xff1a; math \mathbb{E}_{\text{技}} \underbrace{\prod_{\text{Dis}} \text{TechnoCore}}_{\…

LangChain-自定义Tool和Agent结合DeepSeek应用实例

除了调用LangChain内置工具外&#xff0c;也可以自定义工具 实例1&#xff1a; 自定义多个工具 from langchain.agents import initialize_agent, AgentType from langchain_community.agent_toolkits.load_tools import load_tools from langchain_core.tools import tool, …

用 3D 可视化颠覆你的 JSON 数据体验

大家好&#xff0c;这里是架构资源栈&#xff01;点击上方关注&#xff0c;添加“星标”&#xff0c;一起学习大厂前沿架构&#xff01; 复杂的 JSON 数据结构常常让人头疼&#xff1a;层层嵌套的对象、错综复杂的数组关系&#xff0c;用传统的树状视图或表格一览千头万绪&…

MVCC(多版本并发控制)机制

1. MVCC&#xff08;多版本并发控制&#xff09;机制 MVCC 的核心就是 Undo Log Read View&#xff0c;“MV”就是通过 Undo Log 来保存数据的历史版本&#xff0c;实现多版本的管理&#xff0c;“CC”是通过 Read View 来实现管理&#xff0c;通过 Read View 原则来决定数据是…

Mac M1 安装 ffmpeg

1.前言 官网那货没有准备m系列的静态包&#xff0c;然后我呢&#xff0c;不知道怎么想的就从maven项目中的 javacv-platform&#xff0c;且版本为1.5.11依赖里面将这个静态包把了出来&#xff0c;亲测能用&#xff0c;感觉比那些网上说的用什么wget编译安装、brew安装快多了。…

Spring框架学习day3--Spring数据访问层管理(IOC)

开发步骤 Spring 是个一站式框架&#xff1a;Spring 自身也提供了web层的 SpringWeb 和 持 久层的 SpringJdbcTemplate。 开发步骤 1.导入jar包 pom.xml <!-- spring-jdbc--> <dependency><groupId>org.springframework</groupId><artifactId>…

重读《人件》Peopleware -(13)Ⅱ 办公环境 Ⅵ 电话

当你开始收集有关工作时间质量的数据时&#xff0c;你的注意力自然会集中在主要的干扰源之一——打进来的电话。一天内接15个电话并不罕见。虽然这看似平常&#xff0c;但由于重新沉浸所需的时间&#xff0c;它可能会耗尽你几乎一整天的时间。当一天结束时&#xff0c;你会纳闷…