步骤一:下载&解压
官网 : Github:redis-windows/releases 下载 redis 压缩包,也可通过文末附件下载
步骤二:复制&保存脚本
复制当前 py 脚本到redis解压缩 根目录保存为xxx.py,比如cluster.py 或者文末附件下载
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Redis Cluster 交互式初始化脚本
功能:
1. 自动创建 nodes/7001-7006 目录及 redis.conf
2. 停止/启动 Redis 节点
3. 创建 3 主 3 从集群
4. 检查集群状态
5. 支持一键执行全部步骤
交互方式:
- 运行后显示菜单
- ↑ ↓ 方向键选择
- 直接按数字键 1-7 执行对应项
- 回车确认当前选中项
- q 或 ESC 退出
"""
import os
import subprocess
import sys
import time
# Windows 专用键盘读取
import msvcrt
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
NODES_DIR = os.path.join(BASE_DIR, "nodes")
PORTS = [7001, 7002, 7003, 7004, 7005, 7006]
FIRST_PORT = PORTS[0]
# ACL 认证用户(与 redis.conf 中的 user 配置保持一致)
ADMIN_USER = "pufa_admin"
ADMIN_PASS = "admin123456"
APP_USER = "pufa_app"
APP_PASS = "your-password"
APP_KEY_PATTERN = "~*" # pufa_app 可访问的 key 前缀,如 ~* ~pufa_app:* 或 ~otherkey:* 所有 ~*
CONFIG_TEMPLATE = """# ========== 基础 ==========
port {port}
bind 0.0.0.0
protected-mode no
daemonize no
dir "{node_dir}"
# ========== 集群 ==========
cluster-enabled yes
cluster-config-file nodes-{port}.conf
cluster-node-timeout 5000
# ========== 持久化 ==========
appendonly yes
appendfilename "appendonly-{port}.aof"
dbfilename "dump-{port}.rdb"
stop-writes-on-bgsave-error yes
# ========== ACL ==========
# 全部 +@all 部分 -@dangerous +cluster +client +info +publish +subscribe +eval +evalsha +script
user default off
user {app_user} on >{app_pass} {app_key_pattern} &* +@all
user {admin_user} on >{admin_pass} ~* &* +@all
# 节点间通信
masteruser {app_user}
masterauth {app_pass}
logfile "redis.log"
"""
def log(message: str) -> None:
print(f"[INFO] {message}")
def warn(message: str) -> None:
print(f"[WARN] {message}", file=sys.stderr)
def error(message: str) -> None:
print(f"[ERROR] {message}", file=sys.stderr)
def run_redis_cli(args: list, **kwargs) -> subprocess.CompletedProcess:
"""使用 ACL 管理员用户执行 redis-cli(default 用户已关闭,必须显式认证)"""
kwargs.setdefault("cwd", BASE_DIR)
kwargs.setdefault("env", os.environ.copy())
auth_args = ["--user", ADMIN_USER, "--pass", ADMIN_PASS, "--no-auth-warning"]
return subprocess.run(["redis-cli"] + auth_args + args, **kwargs)
def is_node_running(port: int) -> bool:
"""检查指定端口节点是否正在运行"""
try:
result = run_redis_cli(
["-p", str(port), "ping"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=3,
check=False,
)
return b"PONG" in result.stdout
except Exception:
return False
def get_running_nodes() -> list:
"""返回正在运行的节点端口列表"""
return [port for port in PORTS if is_node_running(port)]
def is_cluster_created() -> bool:
"""检查集群是否已创建且状态正常"""
if not is_node_running(FIRST_PORT):
return False
try:
result = run_redis_cli(
["-p", str(FIRST_PORT), "cluster", "info"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=3,
check=False,
)
return b"cluster_state:ok" in result.stdout
except Exception:
return False
def are_configs_ready() -> bool:
"""检查所有节点配置文件是否已存在"""
for port in PORTS:
config_path = os.path.join(NODES_DIR, str(port), "redis.conf")
if not os.path.exists(config_path):
return False
return True
def ask_yes_no(question: str) -> bool:
"""询问用户是否继续"""
while True:
answer = input(f"{question} (y/n): ").strip().lower()
if answer in ("y", "yes"):
return True
if answer in ("n", "no"):
return False
def step_create_nodes() -> bool:
"""步骤 1:创建节点目录和配置文件"""
log("步骤 1:创建节点配置")
if are_configs_ready():
warn("所有节点配置文件已存在")
if not ask_yes_no("是否重新生成配置?"):
log("跳过创建配置")
return True
if not os.path.exists(NODES_DIR):
os.makedirs(NODES_DIR)
log(f"创建目录: {NODES_DIR}")
for port in PORTS:
node_dir = os.path.join(NODES_DIR, str(port))
if not os.path.exists(node_dir):
os.makedirs(node_dir)
log(f"创建节点目录: {node_dir}")
config_path = os.path.join(node_dir, "redis.conf")
node_dir_normalized = node_dir.replace("\\", "/")
config_content = CONFIG_TEMPLATE.format(
port=port,
node_dir=node_dir_normalized,
app_user=APP_USER,
app_pass=APP_PASS,
app_key_pattern=APP_KEY_PATTERN,
admin_user=ADMIN_USER,
admin_pass=ADMIN_PASS,
)
with open(config_path, "w", encoding="utf-8") as f:
f.write(config_content)
log(f"写入配置: {config_path}")
log("节点配置创建完成")
return True
def step_stop_nodes() -> bool:
"""步骤 2:停止正在运行的节点"""
log("步骤 2:停止已有节点")
running = get_running_nodes()
if not running:
warn("没有检测到运行中的节点")
return True
log(f"检测到运行中的节点: {running}")
for port in running:
try:
run_redis_cli(
["-p", str(port), "shutdown"],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
check=False,
)
log(f"已停止 Redis-{port}")
except Exception as e:
error(f"停止 Redis-{port} 失败: {e}")
time.sleep(2)
log("节点停止完成")
return True
def step_start_nodes() -> bool:
"""步骤 3:启动未运行的节点"""
log("步骤 3:启动 Redis 节点")
running = get_running_nodes()
to_start = [port for port in PORTS if port not in running]
if not to_start:
warn("所有节点已经在运行中")
return True
log(f"需要启动的节点: {to_start}")
for port in to_start:
config_path = os.path.join(NODES_DIR, str(port), "redis.conf")
if not os.path.exists(config_path):
error(f"配置文件不存在: {config_path}")
continue
try:
# 参考 cluster-restart.bat
cmd = f'start "Redis-{port}" /min redis-server.exe nodes\\{port}\\redis.conf'
subprocess.Popen(cmd, cwd=BASE_DIR, shell=True)
log(f"已启动 Redis-{port}")
except Exception as e:
error(f"启动 Redis-{port} 失败: {e}")
log("等待节点就绪...")
time.sleep(4)
# 自检启动结果
still_down = [port for port in to_start if not is_node_running(port)]
if still_down:
error(f"以下节点未成功启动: {still_down}")
return False
log("节点启动完成")
return True
def step_create_cluster() -> bool:
"""步骤 4:创建集群"""
log("步骤 4:创建 Redis Cluster")
if is_cluster_created():
warn("集群已经创建并且状态正常")
return True
if not is_node_running(FIRST_PORT):
error("首个节点未运行,请先执行启动节点")
return False
nodes_addr = [f"127.0.0.1:{port}" for port in PORTS]
cmd = (
["redis-cli", "--user", ADMIN_USER, "--pass", ADMIN_PASS, "--no-auth-warning", "--cluster", "create"]
+ nodes_addr
+ ["--cluster-replicas", "1", "--cluster-yes"]
)
result = subprocess.run(cmd, cwd=BASE_DIR, env=os.environ.copy())
time.sleep(2)
if is_cluster_created():
log("集群创建成功")
return True
else:
warn("集群创建命令已执行,但状态未就绪,请检查日志")
return False
def step_check_cluster() -> bool:
"""步骤 5:检查集群状态"""
log("步骤 5:检查集群状态")
if not is_node_running(FIRST_PORT):
error("首个节点未运行,无法检查集群状态")
return False
log("Cluster Info:")
run_redis_cli(["-p", str(FIRST_PORT), "cluster", "info"])
print()
log("Cluster Nodes:")
run_redis_cli(["-p", str(FIRST_PORT), "cluster", "nodes"])
print()
return True
def step_run_all() -> bool:
"""步骤 6:一键执行全部步骤"""
log("一键执行全部步骤")
if not step_create_nodes():
return False
if not step_stop_nodes():
return False
if not step_start_nodes():
return False
if not step_create_cluster():
return False
if not step_check_cluster():
return False
log("全部步骤执行完成")
return True
MENU_OPTIONS = [
{"name": "创建节点配置", "func": step_create_nodes},
{"name": "停止已有节点", "func": step_stop_nodes},
{"name": "启动所有节点", "func": step_start_nodes},
{"name": "创建集群", "func": step_create_cluster},
{"name": "检查集群状态", "func": step_check_cluster},
{"name": "一键执行全部", "func": step_run_all},
{"name": "退出", "func": None},
]
def clear_screen() -> None:
os.system("cls")
def show_menu(selected: int) -> None:
clear_screen()
print("=" * 40)
print(" Redis Cluster 初始化控制台")
print("=" * 40)
print()
print("操作说明:↑ ↓ 选择 | 数字 1-7 直接执行 | 回车确认 | q 退出")
print()
for i, opt in enumerate(MENU_OPTIONS):
marker = "▶" if i == selected else " "
print(f" {marker} {i + 1}. {opt['name']}")
print()
def wait_return() -> None:
print()
input("按回车键返回菜单...")
def execute_option(index: int) -> bool:
"""执行菜单选项,返回是否继续显示菜单"""
if index < 0 or index >= len(MENU_OPTIONS):
return True
opt = MENU_OPTIONS[index]
if opt["func"] is None:
log("退出程序")
return False
print()
success = opt["func"]()
wait_return()
return True
def read_key() -> str:
"""读取一个键盘按键,返回按键类型"""
ch = msvcrt.getch()
# 特殊键(方向键等)以 b'\x00' 或 b'\xe0' 开头
if ch in (b"\x00", b"\xe0"):
ch2 = msvcrt.getch()
if ch2 == b"H":
return "UP"
if ch2 == b"P":
return "DOWN"
if ch2 == b"K":
return "LEFT"
if ch2 == b"M":
return "RIGHT"
return "OTHER"
# 普通按键
if ch == b"\r":
return "ENTER"
if ch == b"\x1b":
return "ESC"
if ch in (b"q", b"Q"):
return "QUIT"
if ch.isdigit():
return ch.decode("ascii")
return "OTHER"
def main() -> int:
selected = 0
while True:
show_menu(selected)
key = read_key()
if key == "UP":
selected = (selected - 1) % len(MENU_OPTIONS)
elif key == "DOWN":
selected = (selected + 1) % len(MENU_OPTIONS)
elif key == "ENTER":
if not execute_option(selected):
break
elif key == "ESC" or key == "QUIT":
log("退出程序")
break
elif key.isdigit():
index = int(key) - 1
if 0 <= index < len(MENU_OPTIONS):
if not execute_option(index):
break
else:
warn("无效选项")
time.sleep(0.5)
return 0
if __name__ == "__main__":
sys.exit(main())
步骤三:执行脚本
进入当前命令行执行 你们重命名就用你们自己的文件名
py cluster.py效果如下:

第一次选 6 就行了

等他跑完,执行 5 检查一下集群状态,出现ok就是正常

链接&测试
这里使用DBX ,去官网 DBX - 15MB,管理50+种数据库! | DBX 下载最新版或者文末附件下载
选择Redis 连接方式选 集群,端口和密码 在py脚本里面,如下图已经测试链接成功了

其他注意事项
1、修改端口和密码:
在py脚本里面修改端口和密码,如果已经创建直接删除nodes文件夹重新执行就行了
2、springboot配置
集群模式:不要再配 host / port ,配置下方这个,会自动识别
redis:
# 集群模式:不要再配 host / port
username: pufa_app
password: spdb-ai-call
timeout: 10s
cluster:
nodes:
- 127.0.0.1:7001
- 127.0.0.1:7002
- 127.0.0.1:7003
- 127.0.0.1:7004
- 127.0.0.1:7005
- 127.0.0.1:7006
max-redirects: 3 # MOVED 重定向次数,默认 3
lettuce:
pool:
max-active: 8
max-idle: 8
min-idle: 0
max-wait: -1ms
# 关键:集群拓扑刷新时自动认证
cluster:
refresh:
adaptive: true
period: 60s附件:
Redis 压缩包:Redis-6.2.18-Windows-x64-msys2.7z
Python脚本:cluster.py
DBX安装包:DBX_0.5.31_x64-setup.exe