Kafka MCP Server

Kafka MCP Server

site icon
2025.03.24 0
Python消息中间件AI 集成实时数据处理开发效率交流协作
Kafka MCP Server 是一个基于 Apache Kafka 的消息上下文协议(MCP)服务器,专为 LLM 和 Agentic 应用设计,提供发布和消费 Kafka 消息的功能。该服务器通过标准化接口使 AI 模型能够与 Kafka 主题进行交互,支持消息的发布和消费。
View on GitHub

Overview

基本能力

产品定位

Kafka MCP Server 是一个消息中间件服务,专注于为 AI 模型和代理应用提供与 Kafka 的集成能力。

核心功能

  • 发布消息到 Kafka 主题
  • 从 Kafka 主题消费消息

适用场景

  • AI 模型与 Kafka 的集成
  • 代理应用的消息传递
  • 实时数据处理和分析

工具列表

  • kafka-publish: 发布信息到配置的 Kafka 主题
  • kafka-consume: 从配置的 Kafka 主题消费信息(注意:一旦消息被读取,使用相同的 groupid 将无法再次读取)

常见问题解答

  • 消息消费后无法再次读取:确保使用不同的 groupid 或从主题的开头重新读取。

使用教程

使用依赖

  • Python 3.8+
  • Apache Kafka 实例
  • Python 依赖(见安装部分)

安装教程

  1. 克隆仓库: bash git clone <repository-url> cd <repository-directory>

  2. 创建并激活虚拟环境: bash python -m venv venv source venv/bin/activate # Windows 使用: venv\Scripts\activate

  3. 安装依赖: bash pip install -r requirements.txt 若无 requirements.txt,安装以下包: bash pip install aiokafka python-dotenv pydantic-settings mcp-server

配置

在项目根目录创建 .env 文件,配置以下变量:

KAFKA_BOOTSTRAP_SERVERS=localhost:9092
TOPIC_NAME=your-topic-name
IS_TOPIC_READ_FROM_BEGINNING=False
DEFAULT_GROUP_ID_FOR_CONSUMER=kafka-mcp-group

调试方式

运行服务器:

python main.py --transport stdio

可用传输选项: - stdio: 标准输入/输出(默认) - sse: 服务器发送事件

与 Claude Desktop 集成: 在 Claude Desktop 配置文件中添加以下配置:

{
    "mcpServers": {
        "kafka": {
            "command": "python",
            "args": [
                "<PATH TO PROJECTS>/main.py"
            ]
        }
    }
}

替换 <PATH TO PROJECTS> 为项目目录的绝对路径。

许可证

该项目遵循 Apache-2.0 开源许可条款,请参阅 Apache-2.0 了解完整条款。