Apache Beam MCP Server

Apache Beam MCP Server

site icon
2025.03.22 3
Python数据管道管理AI 集成开发效率数据库
Apache Beam MCP Server 是一个基于 Model Context Protocol (MCP) 的服务,用于管理跨不同运行器(如 Flink、Spark、Dataflow 和 Direct)的 Apache Beam 数据管道。它提供了一个标准化的 API,适用于数据工程师、AI/LLM 开发者和 DevOps 团队,简化了数据管道的创建、监控和控制。
View on GitHub

Overview

基本能力

产品定位

Apache Beam MCP Server 是一个用于管理 Apache Beam 数据管道的标准化服务,支持多种运行器,并提供 AI 集成能力。

核心功能

  • 多运行器支持:支持 Flink、Spark、Dataflow 和 Direct 运行器。
  • MCP 兼容:遵循 Model Context Protocol,支持 AI 集成。
  • 管道管理:提供创建、监控和控制数据管道的功能。
  • 易于扩展:支持添加新的运行器或自定义功能。
  • 生产就绪:包括 Docker/Kubernetes 部署、监控和扩展功能。

适用场景

  • 数据工程师:通过一致的 API 管理不同运行器的管道。
  • AI/LLM 开发者:通过 MCP 标准实现 AI 控制的数据管道。
  • DevOps 团队:简化管道操作和监控。

工具列表

  • /tools 端点:管理 AI 代理和模型,用于管道处理。
  • /resources 端点:管理数据集和其他管道资源。
  • /contexts 端点:定义管道的执行环境。

常见问题解答

使用教程

使用依赖

# Clone the repository
git clone https://github.com/yourusername/beam-mcp-server.git
cd beam-mcp-server

# Create a virtual environment
python -m venv beam-mcp-venv
source beam-mcp-venv/bin/activate  # On Windows: beam-mcp-venv\Scripts\activate

# Install dependencies
pip install -r requirements.txt

安装教程

# With the Direct runner (no external dependencies)
python main.py --debug --port 8888

# With Flink runner (if you have Flink installed)
CONFIG_PATH=config/flink_config.yaml python main.py --debug --port 8888

调试方式

# Create test input
echo "This is a test file for Apache Beam WordCount example" > /tmp/input.txt

# Submit a job using curl
curl -X POST http://localhost:8888/api/v1/jobs \
  -H "Content-Type: application/json" \
  -d '{
    "job_name": "test-wordcount",
    "runner_type": "direct",
    "job_type": "BATCH",
    "code_path": "examples/pipelines/wordcount.py",
    "pipeline_options": {
      "input_file": "/tmp/input.txt",
      "output_path": "/tmp/output"
    }
  }'

许可证

该项目遵循 MIT 开源许可条款,请参阅 MIT 了解完整条款。