
Kafka MCP Server

2025.03.24
0
Python消息中间件AI 集成实时数据处理开发效率交流协作
Kafka MCP Server 是一个基于 Apache Kafka 的消息上下文协议(MCP)服务器,专为 LLM 和 Agentic 应用设计,提供发布和消费 Kafka 消息的功能。该服务器通过标准化接口使 AI 模型能够与 Kafka 主题进行交互,支持消息的发布和消费。
View on GitHub
Overview
基本能力
产品定位
Kafka MCP Server 是一个消息中间件服务,专注于为 AI 模型和代理应用提供与 Kafka 的集成能力。
核心功能
- 发布消息到 Kafka 主题
- 从 Kafka 主题消费消息
适用场景
- AI 模型与 Kafka 的集成
- 代理应用的消息传递
- 实时数据处理和分析
工具列表
- kafka-publish: 发布信息到配置的 Kafka 主题
- kafka-consume: 从配置的 Kafka 主题消费信息(注意:一旦消息被读取,使用相同的 groupid 将无法再次读取)
常见问题解答
- 消息消费后无法再次读取:确保使用不同的 groupid 或从主题的开头重新读取。
使用教程
使用依赖
- Python 3.8+
- Apache Kafka 实例
- Python 依赖(见安装部分)
安装教程
-
克隆仓库:
bash git clone <repository-url> cd <repository-directory>
-
创建并激活虚拟环境:
bash python -m venv venv source venv/bin/activate # Windows 使用: venv\Scripts\activate
-
安装依赖:
bash pip install -r requirements.txt
若无 requirements.txt,安装以下包:bash pip install aiokafka python-dotenv pydantic-settings mcp-server
配置
在项目根目录创建 .env
文件,配置以下变量:
KAFKA_BOOTSTRAP_SERVERS=localhost:9092
TOPIC_NAME=your-topic-name
IS_TOPIC_READ_FROM_BEGINNING=False
DEFAULT_GROUP_ID_FOR_CONSUMER=kafka-mcp-group
调试方式
运行服务器:
python main.py --transport stdio
可用传输选项:
- stdio
: 标准输入/输出(默认)
- sse
: 服务器发送事件
与 Claude Desktop 集成: 在 Claude Desktop 配置文件中添加以下配置:
{
"mcpServers": {
"kafka": {
"command": "python",
"args": [
"<PATH TO PROJECTS>/main.py"
]
}
}
}
替换 <PATH TO PROJECTS>
为项目目录的绝对路径。