
Overview

2025.03.17
3
PythonKafka 交互消息队列管理集群监控数据库
mcp-kafka 是一个基于模型上下文协议(MCP)的服务端实现,专为 Apache Kafka 及其生态系统设计。它允许语言模型(LLM/SLM)与 Kafka 及其相关工具(如 Kafka Connect、Burrow 和 Cruise Control)进行可靠交互。该服务目前处于开发中(WIP),支持 Kafka 核心 API(不包括 Streams)以及 Burrow 和 Cruise Control 的 REST API。
View on GitHub
Overview
基本能力
产品定位
mcp-kafka 是一个用于与 Apache Kafka 及其生态系统交互的服务端工具,旨在为语言模型提供可靠的 Kafka 操作能力。
核心功能
- 支持 Kafka 核心 API(Admin、Consumer、Producer)
- 支持 Kafka Connect API
- 支持 Burrow API
- 支持 Cruise Control API
适用场景
- 语言模型需要与 Kafka 集群进行交互的场景
- 需要监控和管理 Kafka 集群及其生态工具的场景
- 自动化 Kafka 相关操作的工作流
工具列表
Kafka
consume
: 消费 Kafka 消息produce
: 生产 Kafka 消息describe_kafka_cluster
: 描述 Kafka 集群信息describe_kafka_topics
: 描述 Kafka 主题信息describe_kafka_consumer_groups
: 描述消费者组信息describe_kafka_delegation_tokens
: 描述委托令牌信息describe_kafka_log_dirs
: 描述日志目录信息describe_kafka_configs
: 描述配置信息describe_kafka_acls
: 描述 ACL 信息
Kafka Connect
get_kafka_connect_cluster_info
: 获取 Kafka Connect 集群信息get_kafka_connect_config
: 获取 Kafka Connect 配置get_kafka_connect_connectors
: 获取 Kafka Connect 连接器get_kafka_connect_connector_plugins
: 获取 Kafka Connect 连接器插件get_kafka_connect_loggers
: 获取 Kafka Connect 日志记录器
Burrow
burrow_healthcheck
: Burrow 健康检查burrow_list_clusters
: 列出 Burrow 集群burrow_describe_cluster
: 描述 Burrow 集群burrow_list_consumers_with_group_detail
: 列出消费者及其组详细信息burrow_list_topics
: 列出主题burrow_check_consumer_group_status
: 检查消费者组状态
Cruise Control
cruise_control_get_state
: 获取 Cruise Control 状态cruise_control_get_kafka_cluster_load
: 获取 Kafka 集群负载cruise_control_get_partition_resource_utilization_and_load
: 获取分区资源利用率和负载cruise_control_get_partition_and_replica_state
: 获取分区和副本状态cruise_control_get_optimization_proposals
: 获取优化建议cruise_control_get_user_request_result
: 获取用户请求结果
常见问题解答
- 目前服务处于开发中(WIP),可能会有变化或潜在错误
- 支持的 API 需要通过设置相应的环境变量来启用
使用教程
使用依赖
需要安装 Python 环境,建议使用 Python 3.7+。
安装教程
目前尚未发布到 PyPI,安装方式将在未来版本中提供。计划支持以下安装方式:
uv pip install mcp-kafka
或通过 Docker 安装:
docker pull bkpowers/mcp-kafka
调试方式
服务需要通过设置以下环境变量来启用相应 API:
export KAFKA_BOOTSTRAP_SERVERS=your_bootstrap_servers
export KAFKA_CONNECT_API_URL=your_connect_api_url
export KAFKA_BURROW_API_URL=your_burrow_api_url
export KAFKA_CRUISE_CONTROL_API_URL=your_cruise_control_api_url
启动服务后,可以通过相应的工具命令进行调试。