步骤一:下载&解压

官网 : Github:redis-windows/releases 下载 redis 压缩包,也可通过文末附件下载

步骤二:复制&保存脚本

复制当前 py 脚本到redis解压缩 根目录保存为xxx.py,比如cluster.py 或者文末附件下载

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Redis Cluster 交互式初始化脚本

功能:
1. 自动创建 nodes/7001-7006 目录及 redis.conf
2. 停止/启动 Redis 节点
3. 创建 3 主 3 从集群
4. 检查集群状态
5. 支持一键执行全部步骤

交互方式:
- 运行后显示菜单
- ↑ ↓ 方向键选择
- 直接按数字键 1-7 执行对应项
- 回车确认当前选中项
- q 或 ESC 退出
"""

import os
import subprocess
import sys
import time

# Windows 专用键盘读取
import msvcrt

BASE_DIR = os.path.dirname(os.path.abspath(__file__))
NODES_DIR = os.path.join(BASE_DIR, "nodes")
PORTS = [7001, 7002, 7003, 7004, 7005, 7006]
FIRST_PORT = PORTS[0]

# ACL 认证用户(与 redis.conf 中的 user 配置保持一致)
ADMIN_USER = "pufa_admin"
ADMIN_PASS = "admin123456"
APP_USER = "pufa_app"
APP_PASS = "your-password"
APP_KEY_PATTERN = "~*"  # pufa_app 可访问的 key 前缀,如 ~* ~pufa_app:* 或 ~otherkey:* 所有 ~*

CONFIG_TEMPLATE = """# ========== 基础 ==========
port {port}
bind 0.0.0.0
protected-mode no
daemonize no
dir "{node_dir}"

# ========== 集群 ==========
cluster-enabled yes
cluster-config-file nodes-{port}.conf
cluster-node-timeout 5000

# ========== 持久化 ==========
appendonly yes
appendfilename "appendonly-{port}.aof"
dbfilename "dump-{port}.rdb"
stop-writes-on-bgsave-error yes

# ========== ACL ==========
# 全部 +@all 部分 -@dangerous +cluster +client +info +publish +subscribe +eval +evalsha +script
user default off
user {app_user} on >{app_pass} {app_key_pattern} &* +@all
user {admin_user} on >{admin_pass} ~* &* +@all

# 节点间通信
masteruser {app_user}
masterauth {app_pass}
logfile "redis.log"
"""


def log(message: str) -> None:
    print(f"[INFO] {message}")


def warn(message: str) -> None:
    print(f"[WARN] {message}", file=sys.stderr)


def error(message: str) -> None:
    print(f"[ERROR] {message}", file=sys.stderr)


def run_redis_cli(args: list, **kwargs) -> subprocess.CompletedProcess:
    """使用 ACL 管理员用户执行 redis-cli(default 用户已关闭,必须显式认证)"""
    kwargs.setdefault("cwd", BASE_DIR)
    kwargs.setdefault("env", os.environ.copy())
    auth_args = ["--user", ADMIN_USER, "--pass", ADMIN_PASS, "--no-auth-warning"]
    return subprocess.run(["redis-cli"] + auth_args + args, **kwargs)


def is_node_running(port: int) -> bool:
    """检查指定端口节点是否正在运行"""
    try:
        result = run_redis_cli(
            ["-p", str(port), "ping"],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            timeout=3,
            check=False,
        )
        return b"PONG" in result.stdout
    except Exception:
        return False


def get_running_nodes() -> list:
    """返回正在运行的节点端口列表"""
    return [port for port in PORTS if is_node_running(port)]


def is_cluster_created() -> bool:
    """检查集群是否已创建且状态正常"""
    if not is_node_running(FIRST_PORT):
        return False
    try:
        result = run_redis_cli(
            ["-p", str(FIRST_PORT), "cluster", "info"],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            timeout=3,
            check=False,
        )
        return b"cluster_state:ok" in result.stdout
    except Exception:
        return False


def are_configs_ready() -> bool:
    """检查所有节点配置文件是否已存在"""
    for port in PORTS:
        config_path = os.path.join(NODES_DIR, str(port), "redis.conf")
        if not os.path.exists(config_path):
            return False
    return True


def ask_yes_no(question: str) -> bool:
    """询问用户是否继续"""
    while True:
        answer = input(f"{question} (y/n): ").strip().lower()
        if answer in ("y", "yes"):
            return True
        if answer in ("n", "no"):
            return False


def step_create_nodes() -> bool:
    """步骤 1:创建节点目录和配置文件"""
    log("步骤 1:创建节点配置")

    if are_configs_ready():
        warn("所有节点配置文件已存在")
        if not ask_yes_no("是否重新生成配置?"):
            log("跳过创建配置")
            return True

    if not os.path.exists(NODES_DIR):
        os.makedirs(NODES_DIR)
        log(f"创建目录: {NODES_DIR}")

    for port in PORTS:
        node_dir = os.path.join(NODES_DIR, str(port))
        if not os.path.exists(node_dir):
            os.makedirs(node_dir)
            log(f"创建节点目录: {node_dir}")

        config_path = os.path.join(node_dir, "redis.conf")
        node_dir_normalized = node_dir.replace("\\", "/")
        config_content = CONFIG_TEMPLATE.format(
            port=port,
            node_dir=node_dir_normalized,
            app_user=APP_USER,
            app_pass=APP_PASS,
            app_key_pattern=APP_KEY_PATTERN,
            admin_user=ADMIN_USER,
            admin_pass=ADMIN_PASS,
        )
        with open(config_path, "w", encoding="utf-8") as f:
            f.write(config_content)
        log(f"写入配置: {config_path}")

    log("节点配置创建完成")
    return True


def step_stop_nodes() -> bool:
    """步骤 2:停止正在运行的节点"""
    log("步骤 2:停止已有节点")

    running = get_running_nodes()
    if not running:
        warn("没有检测到运行中的节点")
        return True

    log(f"检测到运行中的节点: {running}")
    for port in running:
        try:
            run_redis_cli(
                ["-p", str(port), "shutdown"],
                stdout=subprocess.DEVNULL,
                stderr=subprocess.DEVNULL,
                check=False,
            )
            log(f"已停止 Redis-{port}")
        except Exception as e:
            error(f"停止 Redis-{port} 失败: {e}")

    time.sleep(2)
    log("节点停止完成")
    return True


def step_start_nodes() -> bool:
    """步骤 3:启动未运行的节点"""
    log("步骤 3:启动 Redis 节点")

    running = get_running_nodes()
    to_start = [port for port in PORTS if port not in running]

    if not to_start:
        warn("所有节点已经在运行中")
        return True

    log(f"需要启动的节点: {to_start}")
    for port in to_start:
        config_path = os.path.join(NODES_DIR, str(port), "redis.conf")
        if not os.path.exists(config_path):
            error(f"配置文件不存在: {config_path}")
            continue

        try:
            # 参考 cluster-restart.bat
            cmd = f'start "Redis-{port}" /min redis-server.exe nodes\\{port}\\redis.conf'
            subprocess.Popen(cmd, cwd=BASE_DIR, shell=True)
            log(f"已启动 Redis-{port}")
        except Exception as e:
            error(f"启动 Redis-{port} 失败: {e}")

    log("等待节点就绪...")
    time.sleep(4)

    # 自检启动结果
    still_down = [port for port in to_start if not is_node_running(port)]
    if still_down:
        error(f"以下节点未成功启动: {still_down}")
        return False

    log("节点启动完成")
    return True


def step_create_cluster() -> bool:
    """步骤 4:创建集群"""
    log("步骤 4:创建 Redis Cluster")

    if is_cluster_created():
        warn("集群已经创建并且状态正常")
        return True

    if not is_node_running(FIRST_PORT):
        error("首个节点未运行,请先执行启动节点")
        return False

    nodes_addr = [f"127.0.0.1:{port}" for port in PORTS]
    cmd = (
        ["redis-cli", "--user", ADMIN_USER, "--pass", ADMIN_PASS, "--no-auth-warning", "--cluster", "create"]
        + nodes_addr
        + ["--cluster-replicas", "1", "--cluster-yes"]
    )
    result = subprocess.run(cmd, cwd=BASE_DIR, env=os.environ.copy())

    time.sleep(2)

    if is_cluster_created():
        log("集群创建成功")
        return True
    else:
        warn("集群创建命令已执行,但状态未就绪,请检查日志")
        return False


def step_check_cluster() -> bool:
    """步骤 5:检查集群状态"""
    log("步骤 5:检查集群状态")

    if not is_node_running(FIRST_PORT):
        error("首个节点未运行,无法检查集群状态")
        return False

    log("Cluster Info:")
    run_redis_cli(["-p", str(FIRST_PORT), "cluster", "info"])
    print()

    log("Cluster Nodes:")
    run_redis_cli(["-p", str(FIRST_PORT), "cluster", "nodes"])
    print()
    return True


def step_run_all() -> bool:
    """步骤 6:一键执行全部步骤"""
    log("一键执行全部步骤")

    if not step_create_nodes():
        return False
    if not step_stop_nodes():
        return False
    if not step_start_nodes():
        return False
    if not step_create_cluster():
        return False
    if not step_check_cluster():
        return False

    log("全部步骤执行完成")
    return True


MENU_OPTIONS = [
    {"name": "创建节点配置", "func": step_create_nodes},
    {"name": "停止已有节点", "func": step_stop_nodes},
    {"name": "启动所有节点", "func": step_start_nodes},
    {"name": "创建集群", "func": step_create_cluster},
    {"name": "检查集群状态", "func": step_check_cluster},
    {"name": "一键执行全部", "func": step_run_all},
    {"name": "退出", "func": None},
]


def clear_screen() -> None:
    os.system("cls")


def show_menu(selected: int) -> None:
    clear_screen()
    print("=" * 40)
    print("    Redis Cluster 初始化控制台")
    print("=" * 40)
    print()
    print("操作说明:↑ ↓ 选择 | 数字 1-7 直接执行 | 回车确认 | q 退出")
    print()
    for i, opt in enumerate(MENU_OPTIONS):
        marker = "▶" if i == selected else " "
        print(f"  {marker} {i + 1}. {opt['name']}")
    print()


def wait_return() -> None:
    print()
    input("按回车键返回菜单...")


def execute_option(index: int) -> bool:
    """执行菜单选项,返回是否继续显示菜单"""
    if index < 0 or index >= len(MENU_OPTIONS):
        return True

    opt = MENU_OPTIONS[index]
    if opt["func"] is None:
        log("退出程序")
        return False

    print()
    success = opt["func"]()
    wait_return()
    return True


def read_key() -> str:
    """读取一个键盘按键,返回按键类型"""
    ch = msvcrt.getch()
    # 特殊键(方向键等)以 b'\x00' 或 b'\xe0' 开头
    if ch in (b"\x00", b"\xe0"):
        ch2 = msvcrt.getch()
        if ch2 == b"H":
            return "UP"
        if ch2 == b"P":
            return "DOWN"
        if ch2 == b"K":
            return "LEFT"
        if ch2 == b"M":
            return "RIGHT"
        return "OTHER"

    # 普通按键
    if ch == b"\r":
        return "ENTER"
    if ch == b"\x1b":
        return "ESC"
    if ch in (b"q", b"Q"):
        return "QUIT"
    if ch.isdigit():
        return ch.decode("ascii")
    return "OTHER"


def main() -> int:
    selected = 0

    while True:
        show_menu(selected)
        key = read_key()

        if key == "UP":
            selected = (selected - 1) % len(MENU_OPTIONS)
        elif key == "DOWN":
            selected = (selected + 1) % len(MENU_OPTIONS)
        elif key == "ENTER":
            if not execute_option(selected):
                break
        elif key == "ESC" or key == "QUIT":
            log("退出程序")
            break
        elif key.isdigit():
            index = int(key) - 1
            if 0 <= index < len(MENU_OPTIONS):
                if not execute_option(index):
                    break
            else:
                warn("无效选项")
                time.sleep(0.5)

    return 0


if __name__ == "__main__":
    sys.exit(main())

步骤三:执行脚本

进入当前命令行执行 你们重命名就用你们自己的文件名

py cluster.py

效果如下:

第一次选 6 就行了

等他跑完,执行 5 检查一下集群状态,出现ok就是正常

链接&测试

这里使用DBX ,去官网 DBX - 15MB,管理50+种数据库! | DBX 下载最新版或者文末附件下载

选择Redis 连接方式选 集群,端口和密码 在py脚本里面,如下图已经测试链接成功了

其他注意事项

1、修改端口和密码:

在py脚本里面修改端口和密码,如果已经创建直接删除nodes文件夹重新执行就行了

2、springboot配置

集群模式:不要再配 host / port ,配置下方这个,会自动识别

  redis:
    # 集群模式:不要再配 host / port  
    username: pufa_app
    password: spdb-ai-call 
    timeout: 10s
    cluster:
      nodes:
        - 127.0.0.1:7001
        - 127.0.0.1:7002
        - 127.0.0.1:7003
        - 127.0.0.1:7004
        - 127.0.0.1:7005
        - 127.0.0.1:7006
      max-redirects: 3   # MOVED 重定向次数,默认 3
    lettuce:
      pool:
        max-active: 8
        max-idle: 8
        min-idle: 0
        max-wait: -1ms
       # 关键:集群拓扑刷新时自动认证
      cluster:
        refresh:
          adaptive: true
          period: 60s

附件:

Redis 压缩包:Redis-6.2.18-Windows-x64-msys2.7z

Python脚本:cluster.py

DBX安装包:DBX_0.5.31_x64-setup.exe