ES海量数据更新及导入导出备份

news2025/6/9 21:39:53

一、根据查询条件更新字段

from elasticsearch import Elasticsearch
import redis
import json

# 替换下面的用户名、密码和Elasticsearch服务器地址
username = 'elastic'
password = 'password '
es_host = 'https://127.0.0.2:30674'

# 使用Elasticsearch实例化时传递用户名和密码
es = Elasticsearch(
    hosts=[es_host],
    basic_auth=(username, password),
    verify_certs=False
    # 如果你的Elasticsearch是通过SSL加密的,还可以添加下面的参数
    # use_ssl=True,
    # verify_certs=True,
    # ca_certs='/path/to/ca/cert',
)

# 使用Elasticsearch实例进行操作,例如搜索
# print(es.cluster.state())

# response = es.search(index="remote_statistics_202409", query={"match_all": {}})
# print(response)

# # 连接Elasticsearch和Redis
# #es = Elasticsearch("http://localhost:9200")
r = redis.StrictRedis(host='10.7.9.13', port=32197, db=11)

# 假设你的Redis键是'my_key'
keys = r.keys()

count = 0

my_map = dict()
for key in keys:
    maps = r.hgetall(key)
    print(key)

    #list_map = dict()
    for key01, value in maps.items():
        # print(type(json.loads(key)))
        # print(type(value))
        # print(type(json.loads(value)))
        # print(json.loads(key),  json.loads(value))
        aa = json.loads(value).get("Latitude")
        if not aa.startswith("0."):
            # list_map[json.loads(key01)] = json.loads(value)
            my_map[key.decode('utf-8') + str(json.loads(key01))] = json.loads(value)

    # my_list.append(list_map)

    print(len(my_map))

    # print(type(my_map.get("21V70000110122B000139"+str(1719713910000))))
    # print(my_map.get("21V70000110122B000139"+str(1719713910000)))
    # print(my_map["21V70000110122B0001391719713910000"])

    # new_map={}
    # new_map["21V70000110122B000139"] = my_map["21V70000110122B0001391719713910000"]

    # for key02, value in my_map.items():
    #     print(key02)
    #     print(type(key02))
    #     print(type(value))
    #     count = len(value) + count
    #     print(count)

    # print(key)
    # print(maps.__len__())
    # count = count + maps.__len__()
    # print(count)
    #
    # one = r.hget(name=key, key='1719563700000')
    # print(one)
    # print(type(one))
    #
    # if one is not None:
    #     one_json = json.loads(one)
    #     print(type(one_json))
    #     print(one_json.get("Latitude"))
    #     print(one_json.get("Longitude"))
    #
    # print("-------------------------------------------------")

    # 从Redis获取数据
    # redis_data = r.get(redis_key)
    # key_map = my_map["21V70000110122B000139"]
    # print(type(key_map))
    # key_json = key_map.get(1719713910000)
    # print(type(key_json))
    # print(key_json)
    # print(key_json.get('Latitude'))

    # minutes = create_at // 60000
    # left = create_at % 60000 // 1000
    #
    # if left <= 15:
    #     left = 0
    # elif left > 45:
    #     left = 60
    # else:
    #     left = 30
    #
    # map_key = minutes * 60000 + left * 1000
    # 如果存在,解析数据并更新Elasticsearch
    # 构建更新查询
    script = {
        # "source": "ctx._source.field_to_update = params.new_value",
        # "source": "ctx._source['Latitude'] = params.new_value[ctx._source.RemoteId][ctx._source.createAt].get('Latitude')",
        # "source": "ctx._source['Latitude'] = params.new_value['21V70000110122B0001391719713910000']['Latitude'];ctx._source['Longitude'] = params.new_value['21V70000110122B0001391719713910000']['Longitude'];ctx._source['tmp'] = params.new_value['21V70000110122B0001391719713910000']",
        # "source": "def aa=ctx._source.RemoteId;ctx._source['Latitude'] = params.new_value['21V70000110122B0001391719713910000']['Latitude'];ctx._source['Longitude'] = params.new_value['21V70000110122B0001391719713910000']['Longitude'];ctx._source['rrc'] = params.new_value['21V70000110122B0001391719713910000'];ctx._source.remove('tmp')",
        "source": "def create_at=ctx._source.createAt;def minutes = Math.floor(create_at / 60000);"
                  "def left = Math.floor(create_at % 60000 / 1000);if(left<=15) {left=0} else if(left >45){ left=60} else {left=30} def form_time= minutes * 60000 + left * 1000;"
                  "ctx._source['create_lltime'] = form_time;def key = ctx._source.RemoteId + (Long)form_time ;def rru=  params.new_value[key];"
                  "if(rru !=null) {ctx._source['Latitude']=rru['Latitude'];ctx._source['Longitude']=rru['Longitude'];ctx._source['rrc'] = rru}",
        # "source": "def aa=ctx._source.RemoteId;ctx._source['Latitude'] = aa;ctx._source['Longitude'] = params.new_value['21V70000110122B0001391719713910000']['Longitude'];ctx._source['tmp'] = params.new_value['21V70000110122B0001391719713910000']",
        "params": {
            "new_value": my_map

        }
        , "lang": "painless"
    }
#
# # 更新查询
ret = es.update_by_query(
    index="remote_statistics_202406",
    script=script,
    # body={
    #     "query": {
    #         "match": {
    #             "id_field": my_list['id_value'],,,,,,,
    #         }
    #     }
    #
    # },
    slices="auto",
    wait_for_completion=False,
    conflicts="proceed"

)

print(ret)

二、ES数据批量导出

import json
from elasticsearch import Elasticsearch
from elasticsearch import helpers
from datetime import datetime
import urllib3
urllib3.disable_warnings()
# 变量
start_date = 1735689600000
end_date = 1738368000000
#index_name = 'remote_statistics_202410'
index_name = 'remote_statistics_202501'

# 替换下面的用户名、密码和Elasticsearch服务器地址
username = 'elastic'
password = 'password'
es_host = 'https://127.0.0.1:32293'

# 使用Elasticsearch实例化时传递用户名和密码
es = Elasticsearch(
    hosts=[es_host],
    basic_auth=(username, password),
    verify_certs=False
    # 如果你的Elasticsearch是通过SSL加密的,还可以添加下面的参数
    # use_ssl=True,
    # verify_certs=True,
    # ca_certs='/path/to/ca/cert',
)



print("----------start---------------")

def fetch_data(start_time, end_time):
    results = helpers.scan(es, body={
        "query": {
            "range": {
                "createAt": {
                    "gte": start_time,
                    "lt": end_time
                }
            }
        }
    }, index=index_name)
    return results


if __name__ == "__main__":
    step = 60 * 60 * 1000

    for i in range(start_date, end_date+8*step, step):
        date = datetime.fromtimestamp(i / 1000)
        print("**********************************************************")
        print(date)
        print("**********************************************************")

        ret = fetch_data(i, i + step)
        count = 0
        with open(str(i) + '.json', 'w') as f:
            for doc in ret:
                f.write(json.dumps(doc['_source']) + '\n')
                count = count + 1
        print(count)

三、ES数据批量导入

import json
from elasticsearch import Elasticsearch
from elasticsearch import helpers
import os
import urllib3
urllib3.disable_warnings()
# 变量
#index_name = 'remote_statistics_202410'
index_name = 'network_statistics_202410'

# 替换下面的用户名、密码和Elasticsearch服务器地址
username = 'elastic'
password = 'password '
es_host = 'https://127.0.0.1:32067'

# 使用Elasticsearch实例化时传递用户名和密码
es = Elasticsearch(
    hosts=[es_host],
    basic_auth=(username, password),
    verify_certs=False
    # 如果你的Elasticsearch是通过SSL加密的,还可以添加下面的参数
    # use_ssl=True,
    # verify_certs=True,
    # ca_certs='/path/to/ca/cert',
)

def bulk_index_file(idx_name, file_path):

    current_dir = os.getcwd()
    file_names = os.listdir(current_dir)

    for file_name in file_names:
        if file_name.endswith(".json"):
            with open(file_name, 'r') as file:
                try:
                    print(file_name)
                    actions = (json.loads(line) for line in file)
                    helpers.bulk(es, actions, index=idx_name)
                except Exception as e:
                    print("error-----------------------")
                    print(e)
                    print(file_name)


# 调用函数
bulk_index_file(index_name, None)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2405870.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

JAVA-springboot log日志

SpringBoot从入门到精通-第8章 日志的操作 一、Spring Boot默认的日志框架 SpringBoot支持很多种日志框架&#xff0c;通常情况下&#xff0c;这些日志框架都是由一个日志抽象层和一个日志实现层搭建而成的&#xff0c;日志抽象层是为记录日志提供的一套标准且规范的框架&…

1.springmvc基础入门(一)

1.Spring MVC概念 Spring MVC 是 Spring Framework 提供的 Web 组件&#xff0c;全称是 Spring Web MVC&#xff0c;是⽬前主流的实现 MVC 设计模式的框架&#xff0c;提供前端路由映射、视图解析等功能。 Java Web 开发者必须要掌握的技术框架。 2.Spring MVC 功能 MVC&am…

模块缝合-把A模块换成B模块(没写完)

把MLP Head替换为KAN 1.在model文件下新建一个python文件 2.把 模块文件里的整个KAN代码复制到新的python文件中 3.在开头导入 from model.KAN(新建文件名&#xff09; import KAN&#xff08;新建文件中的类名&#xff09; 4.sys.path.append(r"D: Icode(Kansformer"…

从零开始学Flink:揭开实时计算的神秘面纱

一、为什么需要Flink&#xff1f; 当你在电商平台秒杀商品时&#xff0c;1毫秒的延迟可能导致交易失败&#xff1b;当自动驾驶汽车遇到障碍物时&#xff0c;10毫秒的计算延迟可能酿成事故。这些场景揭示了一个残酷事实&#xff1a;数据的价值随时间呈指数级衰减。 传统批处理…

Appium如何支持ios真机测试

ios模拟器上UI自动化测试 以appiumwebdriverio为例&#xff0c;详细介绍如何在模拟器上安装和测试app。在使用ios模拟器前&#xff0c;需要安装xcode&#xff0c;创建和启动一个simulator。simulator创建好后&#xff0c;就可以使用xcrun simctl命令安装被测应用并开始测试了。…

JDK17 Http Request 异步处理 源码刨析

为什么可以异步&#xff1f; #调用起始源码 // 3. 发送异步请求并处理响应 CompletableFuture future client.sendAsync( request, HttpResponse.BodyHandlers.ofString() // 响应体转为字符串 ).thenApply(response -> { // 状态码检查&#xff08;非200系列抛出异常&…

【Zephyr 系列 8】构建完整 BLE 产品架构:状态机 + AT 命令 + 双通道通信实战

🧠关键词:Zephyr、BLE、状态机、双向透传、AT 命令、Buffer、主从共存、系统架构 📌适合人群:希望开发 BLE 产品(模块/标签/终端)具备可控、可测、可维护架构的开发者 🧭 引言:从“点功能”到“系统架构” 前面几篇我们已经逐步构建了 BLE 广播、连接、数据透传系统…

【Mac 从 0 到 1 保姆级配置教程 16】- Docker 快速安装配置、常用命令以及实际项目演示

文章目录 前言1. Docker 是什么&#xff1f;2. 为什么要使用 Docker&#xff1f; 安装 Docker1. 安装 Docker Desktop2. 安装 OrbStack3. Docker Desktop VS OrbStack5. 验证安装 使用 Docker 运行项目1. 克隆项目到本地2. 进入项目目录3. 启动容器: 查看运行效果1. OrbStack 中…

2025-05-01-决策树算法及应用

决策树算法及应用 参考资料 GitHub - zhaoyichanghong/machine_learing_algo_python: implement the machine learning algorithms by p(机器学习相关的 github 仓库)决策树实现与应用决策树 概述 机器学习算法分类 决策树算法 决策树是一种以树状结构对数据进行划分的分类…

Redis知识体系

1. 概述 本文总结了Redis基本的核心知识体系&#xff0c;在学习Redis的过程中&#xff0c;可以将其作为学习框架&#xff0c;以此更好的从整体的角度去理解和学习Redis的内容和设计思想。同时知识框架带来的好处是可以帮助我们更好的进行记忆&#xff0c;在大脑中形成相应的知识…

mysql-MySQL体系结构和存储引擎

1. MySQL体系结构和存储引擎 MySQL被设计成一个单进程多线程架构的数据库&#xff0c;MySQL数据库实例在系统上的表现就是一个进 程当启动实例时&#xff0c;读取配置文件&#xff0c;根据配置文件的参数来启动数据库实例&#xff1b;若没有&#xff0c;按编译时的默认 参数设…

黑马Javaweb Request和Response

一.介绍 在 Web 开发中&#xff0c;HttpServletRequest 和 HttpServletResponse 是两个非常重要的类&#xff0c;它们分别用于处理客户端的请求和服务器的响应。以下是它们的详细说明和使用方法&#xff1a; 1. HttpServletRequest HttpServletRequest 是一个接口&#xff0…

Gerrit+repo管理git仓库,如果本地有新分支不能执行repo sync来同步远程所有修改,会报错

问题&#xff1a;创建一个本地分支TEST 来关联远程已有分支origin/TEST&#xff0c;直接执行repo sync可能会出现问题&#xff1a;比如&#xff0c;本地分支TES会错乱关联到origin/master&#xff0c;或者拉不下最新代码等问题。 // git checkout -b 新分支名 远程分支名字 git…

豆瓣图书评论数据分析与可视化

【题目描述】豆瓣图书评论数据爬取。以《平凡的世界》、《都挺好》等为分析对象&#xff0c;编写程序爬取豆瓣读书上针对该图书的短评信息&#xff0c;要求&#xff1a; &#xff08;1&#xff09;对前3页短评信息进行跨页连续爬取&#xff1b; &#xff08;2&#xff09;爬取…

Vue ④-组件通信 || 进阶语法

组件三大部分 template&#xff1a;只有能一个根元素 style&#xff1a;全局样式(默认)&#xff1a;影响所有组件。局部样式&#xff1a;scoped 下样式&#xff0c;只作用于当前组件 script&#xff1a;el 根实例独有&#xff0c;data 是一个函数&#xff0c;其他配置项一致…

从入门到实战:AI学习路线全解析——避坑指南

分享一下阿里的人工智能学习路线,为感兴趣系统学习的小伙伴们探路。 一、谁适合学这门AI课程?五类人群的精准定位 无论你是零基础小白还是职场转型者,这套系统化课程都能为你量身定制成长路径: 零基础爱好者(无编程/数学背景) 课程提供Python和数学前置学习建议,先补基…

uniapp实现的简约美观的星级评分组件

采用 uniapp 实现的一款简约美观的星级评分模板&#xff0c;提供丝滑动画效果&#xff0c;用户可根据自身需求进行自定义修改、扩展&#xff0c;纯CSS、HTML实现&#xff0c;支持web、H5、微信小程序&#xff08;其他小程序请自行测试&#xff09; 可到插件市场下载尝试&#x…

AWS Elastic Beanstalk + CodePipeline(Python Flask Web的国区CI/CD)

目标 需要使用AWS Elastic Beanstalk 部署一个Python的Flask Web应用&#xff0c;并且使用CodePipeline作为CI/CD工作流。 eb部署图 前提 假设你已经有一个能够正常运行的Python的Flask Web应用项目代码&#xff0c;而且需要对已有Flask工程做一些调整。由于AWS Elastic Bea…

多线程语音识别工具

软件介绍 本文介绍一款支持大厂接口的语音转文字工具&#xff0c;具备免配置、免费使用的特点。 软件特性 该工具是一款完全免费的桌面端应用程序&#xff0c;部署于开源社区平台&#xff0c;其核心优势在于整合了多家技术供应商的接口资源。 操作方式 用户只需将音频…

DiMTAIC 2024 数字医学技术及应用创新大赛-甲状腺B超静态及动态影像算法赛-参赛项目

参赛成绩 项目介绍 去年参加完这个比赛之后&#xff0c;整理了项目文件和代码&#xff0c;虽然比赛没有获奖&#xff0c;但是参赛过程中自己也很有收获&#xff0c;自己一个人搭建了完整的pipeline并基于此提交了多次提高成绩&#xff0c;现在把这个项目梳理成博客&#xff0c…