迁移临时数据脚本
打开PowerShell 输入命令powershell -ExecutionPolicy Bypass -File xxx.ps1这句 PowerShell 命令的作用是临时允许执行脚本文件并且运行指定的 .ps1 脚本。1. 每个部分是什么意思powershell启动 PowerShell 环境-ExecutionPolicy Bypass临时关闭执行策略限制Windows 默认禁止直接运行陌生 .ps1 脚本加这句就是“这次先不管安全限制允许运行脚本”-File 文件名.ps1指定要运行的PowerShell 脚本文件2. 常见正常用途# 基础使用默认路径自动备份检查进程 .\migrate_temp_data.ps1 # 指定根路径 .\migrate_temp_data.ps1 -RootPath D:\your-app-root # 跳过备份 跳过进程检查 .\migrate_temp_data.ps1 -SkipBackup -SkipProcessCheckpowershell启动 PowerShell 环境-ExecutionPolicy Bypass临时关闭执行策略限制Windows 默认禁止直接运行陌生 .ps1 脚本加这句就是“这次先不管安全限制允许运行脚本”-File 文件名.ps1指定要运行的PowerShell 脚本文件以绕过系统安全策略的方式直接运行 xxx.ps1 这个 PowerShell 脚本。.安全提醒非常重要这是高权限操作可以绕过系统对脚本的安全拦截如果你不知道这个脚本是干嘛的不要随便运行很多病毒、木马、流氓软件也会用这种方式偷偷执行恶意脚本运行自己写的批量自动化脚本安装 / 配置软件、部署环境系统维护、批量修改设置迁移临时数据脚本[CmdletBinding()] param( [Parameter(Mandatory $false)] [string]$RootPath $PSScriptRoot, [switch]$SkipBackup, [switch]$SkipProcessCheck ) Set-StrictMode -Version Latest $ErrorActionPreference Stop if ([string]::IsNullOrWhiteSpace($RootPath)) { $RootPath (Get-Location).Path } function Write-Log { param( [Parameter(Mandatory $true)][string]$Message, [ValidateSet(INFO, WARN, ERROR)] [string]$Level INFO ) $line [{0}] {1,-5} {2} -f (Get-Date -Format yyyy-MM-dd HH:mm:ss), $Level, $Message Write-Host $line } function Resolve-FullPath { param( [Parameter(Mandatory $true)][string]$PathValue, [switch]$AllowMissing ) $resolved Resolve-Path -LiteralPath $PathValue -ErrorAction SilentlyContinue if ($resolved) { return $resolved.Path } if ($AllowMissing) { return [System.IO.Path]::GetFullPath($PathValue) } throw Path not found: $PathValue } function Get-PythonCommand { $candidates ( { Exe py; Args (-3) }, { Exe python; Args () }, { Exe python3; Args () } ) foreach ($candidate in $candidates) { $cmd Get-Command $candidate.Exe -ErrorAction SilentlyContinue if (-not $cmd) { continue } try { $candidate.Exe ($candidate.Args (--version)) * $null if ($LASTEXITCODE -eq 0) { return $candidate } } catch { continue } } return $null } function Test-FileLocked { param([Parameter(Mandatory $true)][string]$PathValue) if (-not (Test-Path -LiteralPath $PathValue)) { return $false } try { $stream [System.IO.File]::Open( $PathValue, [System.IO.FileMode]::Open, [System.IO.FileAccess]::ReadWrite, [System.IO.FileShare]::None ) $stream.Close() return $false } catch { return $true } } function Invoke-RobocopyMerge { param( [Parameter(Mandatory $true)][string]$Source, [Parameter(Mandatory $true)][string]$Destination ) New-Item -ItemType Directory -Path $Destination -Force | Out-Null # /E copy all subdirs including empty; merge into destination without deleting extras. robocopy $Source $Destination /E /COPY:DAT /R:2 /W:1 /NFL /NDL /NJH /NJS /NP | Out-Null $code $LASTEXITCODE if ($code -gt 7) { throw robocopy failed with exit code $code while copying $Source - $Destination } } function Apply-BackendImageHotfix { param([Parameter(Mandatory $true)][string]$RootPathValue) $routeFile Join-Path $RootPathValue resources\backend\src\automation\yingdao_routes_ops_task.py if (-not (Test-Path -LiteralPath $routeFile)) { Write-Log Backend route file not found, skip image hotfix: $routeFile WARN return } $content Get-Content -LiteralPath $routeFile -Raw -Encoding UTF8 $updated $content $mimeFallbackPattern if not mimetype:r?n\s*mimetype application/octet-stream $mimeFallbackReplacement if not mimetype: suffix file_path.suffix.lower() if suffix .webp: mimetype image/webp elif suffix in {.jpg, .jpeg}: mimetype image/jpeg elif suffix .png: mimetype image/png elif suffix .gif: mimetype image/gif else: mimetype application/octet-stream $updated [System.Text.RegularExpressions.Regex]::Replace( $updated, $mimeFallbackPattern, $mimeFallbackReplacement ) $updated [System.Text.RegularExpressions.Regex]::Replace( $updated, response\.headers\[Connection\]\s*\s*keep-alive, response.headers.pop(Connection, None) ) if ($updated -ne $content) { Set-Content -LiteralPath $routeFile -Value $updated -Encoding UTF8 Write-Log Applied backend image hotfix: $routeFile } else { Write-Log Backend image hotfix already present: $routeFile } } $root Resolve-FullPath -PathValue $RootPath $sourceData Join-Path $root temp\data $sourceConfig Join-Path $root temp\config $sourceImages Join-Path $root temp\images $targetData Join-Path $root data $targetConfig Join-Path $root config $targetImages Join-Path $root images $targetDb Join-Path $targetData database\app.db Write-Log Root path: $root Write-Log Source data: $sourceData Write-Log Source config: $sourceConfig Write-Log Source images: $sourceImages if (-not (Test-Path -LiteralPath $sourceData)) { throw Missing source directory: $sourceData } if (-not (Test-Path -LiteralPath $sourceConfig)) { throw Missing source directory: $sourceConfig } if (-not $SkipProcessCheck) { $running Get-Process -ErrorAction SilentlyContinue | Where-Object { $_.ProcessName -eq 1688-OZON Automation } if ($running) { throw Detected running process 1688-OZON Automation. Please close app before migration. } } if (Test-FileLocked -PathValue $targetDb) { throw Target database is locked: $targetDb } if (-not $SkipBackup) { $stamp Get-Date -Format yyyyMMdd_HHmmss $backupRoot Join-Path $root (migration_backup_$stamp) New-Item -ItemType Directory -Path $backupRoot -Force | Out-Null if (Test-Path -LiteralPath $targetConfig) { Copy-Item -LiteralPath $targetConfig -Destination (Join-Path $backupRoot config) -Recurse -Force } if (Test-Path -LiteralPath $targetData) { Copy-Item -LiteralPath $targetData -Destination (Join-Path $backupRoot data) -Recurse -Force } Write-Log Backup created: $backupRoot } else { Write-Log Skip backup requested WARN } Write-Log Copying config files from temp/config - config ... Invoke-RobocopyMerge -Source $sourceConfig -Destination $targetConfig Write-Log Copying data files from temp/data - data ... Invoke-RobocopyMerge -Source $sourceData -Destination $targetData if (Test-Path -LiteralPath $sourceImages) { Write-Log Copying image files from temp/images - images ... Invoke-RobocopyMerge -Source $sourceImages -Destination $targetImages } else { Write-Log temp/images not found, skip image copy. Existing images directory will be kept. WARN } if (-not (Test-Path -LiteralPath $targetDb)) { throw Database not found after copy: $targetDb } # Avoid stale SQLite sidecar files from previous runs. foreach ($sidecar in ($targetDb-wal, $targetDb-shm)) { if (Test-Path -LiteralPath $sidecar) { Remove-Item -LiteralPath $sidecar -Force -ErrorAction SilentlyContinue Write-Log Removed SQLite sidecar: $sidecar } } $python Get-PythonCommand if (-not $python) { throw Python runtime not found. Install Python 3 or ensure py/python is in PATH. } $pythonScript import json import shutil import sqlite3 import sys from datetime import datetime from pathlib import Path db_path Path(sys.argv[1]) images_root Path(sys.argv[2]) if len(sys.argv) 2 else None if not db_path.exists(): raise SystemExit(fDatabase not found: {db_path}) obsolete_tables [ collection_tasks, upload_tasks, system_logs, customer_service_tasks, category_update_tasks, ] def table_exists(cursor, table_name): cursor.execute( SELECT 1 FROM sqlite_master WHERE typetable AND name? LIMIT 1, (table_name,), ) return cursor.fetchone() is not None def columns(cursor, table_name): cursor.execute(fPRAGMA table_info({table_name})) return [row[1] for row in cursor.fetchall()] def read_integrity(connection): _cur connection.cursor() _cur.execute(PRAGMA integrity_check) return _cur.fetchone()[0] def sql_quote_path(path_obj: Path) - str: return str(path_obj).replace(, ) def parse_json_list(value): if value is None: return [] if isinstance(value, str): text value.strip() if not text: return [] try: decoded json.loads(text) except Exception: return [text] else: decoded value if isinstance(decoded, list): return decoded if isinstance(decoded, str): return [decoded] return [] def normalize_image_ref(value): raw str(value or ).strip() if not raw: return None if raw.startswith((http://, https://)): return raw normalized raw.replace(\\, /) normalized normalized.split(?, 1)[0].split(#, 1)[0] lower_text normalized.lower() marker /images/ marker_index lower_text.find(marker) if marker_index 0: relative normalized[marker_index len(marker):] elif lower_text.startswith(images/): relative normalized[len(images/):] else: path_obj Path(normalized) if path_obj.is_absolute(): parts [part for part in path_obj.parts if part not in (path_obj.anchor, \\, /)] lower_parts [part.lower() for part in parts] if images in lower_parts: idx lower_parts.index(images) relative /.join(parts[idx 1 :]) else: relative /.join(parts) else: relative normalized relative relative.lstrip(/).replace(\\, /) if not relative: return None return f/images/{relative} def normalize_image_list(values): result [] for item in values: normalized normalize_image_ref(item) if not normalized: continue if normalized not in result: result.append(normalized) return result def image_exists(image_url): if not isinstance(image_url, str) or not image_url.startswith(/images/): return True if images_root is None: return True relative image_url[len(/images/):] return (images_root / relative).exists() def repair_database_in_place(path_obj: Path) - str: stamp datetime.now().strftime(%Y%m%d_%H%M%S) backup_path path_obj.with_name(f{path_obj.name}.before_auto_repair_{stamp}) repaired_path path_obj.with_name(f{path_obj.stem}.auto_repaired_{stamp}{path_obj.suffix}) shutil.copy2(path_obj, backup_path) source_conn sqlite3.connect(str(path_obj)) source_conn.execute(fVACUUM INTO {sql_quote_path(repaired_path)}) source_conn.close() verify_conn sqlite3.connect(str(repaired_path)) repaired_integrity read_integrity(verify_conn) verify_conn.close() if repaired_integrity ! ok: repaired_path.unlink(missing_okTrue) raise RuntimeError(fauto repair failed, rebuilt db integrity is: {repaired_integrity}) path_obj.unlink(missing_okTrue) repaired_path.replace(path_obj) return str(backup_path) conn sqlite3.connect(str(db_path)) conn.execute(PRAGMA busy_timeout 5000) cur conn.cursor() added [] skipped [] dropped [] not_found [] normalized_image_rows 0 normalized_image_fields {images: 0, images_ru: 0} image_refs_total 0 image_refs_missing 0 image_refs_missing_samples [] auto_repair_performed False repair_backup integrity_before read_integrity(conn) if integrity_before ! ok: conn.close() repair_backup repair_database_in_place(db_path) auto_repair_performed True conn sqlite3.connect(str(db_path)) conn.execute(PRAGMA busy_timeout 5000) cur conn.cursor() integrity_after_repair read_integrity(conn) if integrity_after_repair ! ok: raise RuntimeError(fdb integrity remains invalid after repair attempt: {integrity_after_repair}) if table_exists(cur, task_progress): existing set(columns(cur, task_progress)) if is_file not in existing: cur.execute(ALTER TABLE task_progress ADD COLUMN is_file BOOLEAN DEFAULT 0) added.append(task_progress.is_file) else: skipped.append(task_progress.is_file) else: raise RuntimeError(Missing required table: task_progress) for name in obsolete_tables: if table_exists(cur, name): cur.execute(fDROP TABLE {name}) dropped.append(name) else: not_found.append(name) if table_exists(cur, products): cur.execute(SELECT id, images, images_ru FROM products) for product_id, images_raw, images_ru_raw in cur.fetchall(): row_changed False for field_name, field_value in ((images, images_raw), (images_ru, images_ru_raw)): old_values parse_json_list(field_value) new_values normalize_image_list(old_values) if new_values ! old_values: cur.execute( fUPDATE products SET {field_name} ? WHERE id ?, (json.dumps(new_values, ensure_asciiFalse), product_id), ) normalized_image_fields[field_name] 1 row_changed True for image_url in new_values: if not isinstance(image_url, str) or not image_url.startswith(/images/): continue image_refs_total 1 if not image_exists(image_url): image_refs_missing 1 if len(image_refs_missing_samples) 20: image_refs_missing_samples.append({ product_id: product_id, field: field_name, image: image_url, }) if row_changed: normalized_image_rows 1 conn.commit() integrity_final read_integrity(conn) result { db_path: str(db_path), added: added, skipped: skipped, dropped: dropped, not_found: not_found, auto_repair_performed: auto_repair_performed, repair_backup: repair_backup, integrity_before: integrity_before, integrity_after_repair: integrity_after_repair, integrity_final: integrity_final, task_progress_columns: columns(cur, task_progress), normalized_image_rows: normalized_image_rows, normalized_image_fields: normalized_image_fields, image_refs_total: image_refs_total, image_refs_missing: image_refs_missing, image_refs_missing_samples: image_refs_missing_samples, } conn.close() print(json.dumps(result, ensure_asciiFalse)) $tempPy Join-Path $env:TEMP (ozon_migrate_ [Guid]::NewGuid().ToString(N) .py) Set-Content -LiteralPath $tempPy -Value $pythonScript -Encoding UTF8 try { $raw $python.Exe ($python.Args ($tempPy, $targetDb, $targetImages)) 21 if ($LASTEXITCODE -ne 0) { throw (Database migration failed:n ($raw | Out-String)) } } finally { Remove-Item -LiteralPath $tempPy -Force -ErrorAction SilentlyContinue } $result (($raw | Out-String).Trim()) | ConvertFrom-Json if ($result.integrity_final -ne ok) { throw SQLite integrity_check failed: $($result.integrity_final) } Write-Log Database migrated: $($result.db_path) Write-Log Added fields: $(($result.added) -join , ) Write-Log Dropped obsolete tables: $(($result.dropped) -join , ) Write-Log Integrity: before$($result.integrity_before), after_repair$($result.integrity_after_repair), final$($result.integrity_final) if ($result.auto_repair_performed) { Write-Log Auto-repaired SQLite before migration. Backup: $($result.repair_backup) WARN } Write-Log task_progress columns: $(($result.task_progress_columns) -join , ) Write-Log Normalized product image paths: rows$($result.normalized_image_rows), images$($result.normalized_image_fields.images), images_ru$($result.normalized_image_fields.images_ru) Write-Log Image reference check: total$($result.image_refs_total), missing$($result.image_refs_missing) if ($result.image_refs_missing -gt 0) { Write-Log (Missing image samples: ((($result.image_refs_missing_samples) | ConvertTo-Json -Compress))) WARN } Apply-BackendImageHotfix -RootPathValue $root Write-Log Migration completed successfully. Write-Log Image migration: if temp/images exists it has been merged into images/.运行命令powershell -ExecutionPolicy Bypass -File migrate_temp_data.ps1一、脚本核心功能参数配置可指定根路径默认脚本所在目录支持跳过备份、跳过进程检查严格模式执行错误立即终止ErrorActionPreference Stop基础工具函数Write-Log带时间戳、日志级别的日志输出Resolve-FullPath解析绝对路径支持允许路径不存在的场景Get-PythonCommand自动检测系统中的 Pythonpy/python/python3Test-FileLocked检测文件是否被锁定用于检查数据库是否被占用Invoke-RobocopyMerge调用 robocopy 合并目录保留目标目录原有文件前置检查校验源目录temp/data、temp/config是否存在检查目标数据库data/database/app.db是否被锁定可选检查 “1688-OZON Automation” 进程是否运行避免文件占用数据备份默认对目标 config、data 目录备份命名格式migration_backup_时间戳支持通过-SkipBackup参数跳过备份数据迁移用 robocopy 将 temp/config → config、temp/data → data、temp/images → images目录合并不删除目标原有文件清理 SQLite 临时文件app.db-wal、app.db-shm数据库处理核心 Python 脚本自动修复检测数据库完整性若损坏则自动修复并备份原文件字段调整给 task_progress 表添加 is_file 字段不存在时清理废弃表删除 collection_tasks、upload_tasks 等废弃表图片路径标准化统一 products 表的 images、images_ru 字段路径格式转为/images/相对路径检查图片文件是否存在输出缺失的图片引用完整性校验迁移后再次校验数据库完整性后端代码热修复修正yingdao_routes_ops_task.py中的 MIME 类型处理支持 webp/jpg/png/gif移除 Connection: keep-alive 响应头二、关键使用场景powershell# 基础使用默认路径自动备份检查进程 .\migrate_temp_data.ps1 # 指定根路径 .\migrate_temp_data.ps1 -RootPath D:\your-app-root # 跳过备份 跳过进程检查 .\migrate_temp_data.ps1 -SkipBackup -SkipProcessCheck三、核心注意事项四、常见问题排查如果需要对脚本进行修改比如新增废弃表、调整图片 MIME 类型、修改备份规则等可以针对性调整对应函数或 Python 脚本片段。依赖 Python脚本依赖 Python 3 环境需确保 py/python/python3 在系统 PATH 中文件锁定目标数据库被占用时会终止执行需关闭相关进程日志输出所有操作有详细日志可通过控制台查看执行状态图片迁移仅当 temp/images 存在时才会合并目标 images 目录原有文件会保留数据库修复若数据库损坏会自动修复并备份原文件修复失败会抛出异常Python 未找到安装 Python 并添加到 PATH或手动指定 Python 路径数据库锁定关闭 “1688-OZON Automation” 进程或用-SkipProcessCheck跳过检查不推荐robocopy 报错检查源 / 目标目录权限确保 robocopy 可执行Windows 自带图片缺失警告日志中显示的缺失图片需手动核对文件是否存在于 images 目录
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2517905.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!