大语言模型MCP学习05-将自定义mcp服务部署到百炼平台02

一、阿里云函数计算

阿里云函数的官网: https://www.aliyun.com/product/fc 。 官方对其的定义与介绍是:函数计算(Function Compute,简称FC)是事件驱动的全托管计算服务。使用函数计算,您无需采购与管理服务器等基础设施,只需编写并上传代码或镜像。函数计算为您准备好计算资源,可靠地运行任务,通过弹性伸缩应对流量峰谷,并提供日志查询、性能监控和报警等功能。 函数计算提供了一种事件驱动的计算模型。您可以在指定函数中创建触发器,该触发器描述了一组规则,当某个事件满足这些规则,事件源就会触发关联的函数

使用Serverless架构的函数计算,只需聚焦于业务本身,平台为用户准备好计算资源,可靠地运行任务,并通过弹性伸缩应对流量峰谷,用户只需为任务实际消耗的资源付费。

从技术上来说,除了阿里云函数计算,其实还有可以使用云服务器(应该是其他厂商的云服务器也可以,但因为要使用百炼平台,所以使用阿里云服务器应该最好)。如果自己已经购买了阿里云服务器,直接使用即可;但如果目前还没有购买,不建议为了测试与试用“在云服务器上部署MCP服务”而购买阿里云服务器,因为有更实惠的选择,它就是阿里云函数计算。

二、实施步骤

2.1 优化服务端代码

大语言模型MCP学习03-MCP实现对MySQL数据库的操作 一文中,笔者描述了如何在cursor中通过本地模式运行与使用自定义MCP服务,我把这个MCP服务命令为“操作MySQL数据库的自定义MCP应用”(其实是MySQL类数据库都可以,即包含MySQL与MariaDB),下文简称为自定义MCP应用。但当时那个版本的"自定义MCP应用"直接将数据库的连接信息配置在源码文件main.py中,不够规范严谨。现对此main.py进行优化,改进优化点如下:

  • 配置灵活性: 支持通过命令行参数、环境变量和默认值配置数据库连接
  • 安全性: 不再将敏感信息硬编码在代码中
  • 可维护性: 配置和代码分离,便于不同环境的部署
  • 兼容性: 支持本地部署和远程部署

另一方面,改进优化支持通过命令参数方式、系统环境变量、默认值3种方式配置数据库连接信息,优化级逐渐降低:

  • 命令行参数 (--host, --port, --user, --password, --database)
  • 环境变量 (MYSQL_HOST, MYSQL_PORT, MYSQL_USER, MYSQL_PASSWORD, MYSQL_DATABASE)
  • 默认值 (localhost, 3306, root, 'root@123', 'test01')

如下是改进优化的main.py源文件内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
#!/usr/bin/env python3
"""
MySQL MCP Server - 配置优化版本
提供MySQL数据库的创建表、增删改查等操作功能
支持通过环境变量和命令行参数配置数据库连接
"""

import asyncio
import json
import logging
import sys
import os
import argparse
from typing import Any, Dict, List, Optional

import aiomysql
from mcp.server.fastmcp import FastMCP
from mcp.server.stdio import stdio_server

# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stderr) # 输出到stderr避免与MCP通信冲突
]
)
logger = logging.getLogger(__name__)

## 从环境变量获取端口,如果没有则使用默认端口
port = int(os.environ.get('FC_SERVER_PORT', '9000'))
host = os.environ.get('FC_SERVER_HOST', '0.0.0.0')
## 在创建FastMCP实例时传递host和port参数
app = FastMCP("mysql-mcp-server", host=host, port=port)

class MySQLManager:
def __init__(self, host=None, port=None, user=None, password=None, database=None):
"""
初始化MySQL管理器

Args:
host: 数据库主机地址
port: 数据库端口
user: 数据库用户名
password: 数据库密码
database: 数据库名称
"""
# 优先使用传入的参数,然后是环境变量,最后是默认值
self.db_config = {
'host': host or os.getenv('MYSQL_HOST', 'localhost'),
'port': int(port or os.getenv('MYSQL_PORT', 3306)),
'user': user or os.getenv('MYSQL_USER', 'root'),
'password': password or os.getenv('MYSQL_PASSWORD', 'root@123'),
'db': database or os.getenv('MYSQL_DATABASE', 'test01'),
'charset': 'utf8mb4',
'autocommit': True
}
self.pool = None
self._lock = asyncio.Lock()

# 验证必要的配置
if not self.db_config['password']:
logger.warning("数据库密码为空,请确保这是预期的配置")

def get_config_info(self):
"""获取配置信息(隐藏密码)"""
config_info = self.db_config.copy()
config_info['password'] = '*' * len(config_info['password']) if config_info['password'] else 'Empty'
return config_info

async def get_connection(self):
"""获取数据库连接"""
async with self._lock:
if not self.pool:
try:
logger.info(f"正在连接数据库: {self.db_config['host']}:{self.db_config['port']}")
self.pool = await aiomysql.create_pool(
host=self.db_config['host'],
port=self.db_config['port'],
user=self.db_config['user'],
password=self.db_config['password'],
db=self.db_config['db'],
charset=self.db_config['charset'],
autocommit=self.db_config['autocommit'],
minsize=1,
maxsize=5,
connect_timeout=10,
pool_recycle=3600
)
logger.info("数据库连接池创建成功")
except Exception as e:
logger.error(f"创建数据库连接池失败: {e}")
raise

try:
conn = await self.pool.acquire()
return conn
except Exception as e:
logger.error(f"获取数据库连接失败: {e}")
raise

async def release_connection(self, conn):
"""释放数据库连接"""
if conn and self.pool:
try:
await self.pool.release(conn)
except Exception as e:
logger.error(f"释放数据库连接失败: {e}")

async def close_pool(self):
"""关闭连接池"""
if self.pool:
try:
self.pool.close()
await self.pool.wait_closed()
self.pool = None
logger.info("数据库连接池已关闭")
except Exception as e:
logger.error(f"关闭连接池失败: {e}")

# 全局MySQL管理器实例(将在main函数中初始化)
mysql_manager = None

@app.tool()
async def create_table(table_name: str, columns: List[Dict[str, str]]) -> str:
"""
创建MySQL数据表

Args:
table_name: 表名
columns: 列定义数组,每个元素包含name(列名)、type(数据类型)、constraints(约束条件,可选)

Returns:
创建结果消息
"""
conn = None
try:
conn = await mysql_manager.get_connection()
async with conn.cursor() as cursor:
# 构建CREATE TABLE语句
column_defs = []
for col in columns:
col_def = f"`{col['name']}` {col['type']}"
if col.get('constraints'):
col_def += f" {col['constraints']}"
column_defs.append(col_def)

sql = f"CREATE TABLE IF NOT EXISTS `{table_name}` ({', '.join(column_defs)})"
await cursor.execute(sql)
logger.info(f"表 '{table_name}' 创建成功")
return f"表 '{table_name}' 创建成功"

except Exception as e:
error_msg = f"创建表失败: {str(e)}"
logger.error(error_msg)
return error_msg
finally:
if conn:
await mysql_manager.release_connection(conn)

@app.tool()
async def insert_data(table_name: str, data: Dict[str, Any]) -> str:
"""
向表中插入数据

Args:
table_name: 表名
data: 要插入的数据,键值对形式

Returns:
插入结果消息
"""
conn = None
try:
conn = await mysql_manager.get_connection()
async with conn.cursor() as cursor:
columns = list(data.keys())
values = list(data.values())
placeholders = ', '.join(['%s'] * len(values))
column_names = ', '.join([f"`{col}`" for col in columns])

sql = f"INSERT INTO `{table_name}` ({column_names}) VALUES ({placeholders})"
await cursor.execute(sql, values)
result_msg = f"数据插入成功,受影响行数: {cursor.rowcount}"
logger.info(result_msg)
return result_msg

except Exception as e:
error_msg = f"插入数据失败: {str(e)}"
logger.error(error_msg)
return error_msg
finally:
if conn:
await mysql_manager.release_connection(conn)

@app.tool()
async def select_data(table_name: str, columns: Optional[List[str]] = None,
where_clause: str = "", limit: int = 100) -> str:
"""
查询表中的数据

Args:
table_name: 表名
columns: 要查询的列名,默认为所有列
where_clause: WHERE条件子句
limit: 限制返回行数

Returns:
查询结果的JSON字符串
"""
if columns is None:
columns = ["*"]

conn = None
try:
conn = await mysql_manager.get_connection()
async with conn.cursor(aiomysql.DictCursor) as cursor:
if columns == ["*"]:
column_str = "*"
else:
column_str = ', '.join([f"`{col}`" for col in columns])

sql = f"SELECT {column_str} FROM `{table_name}`"
if where_clause:
sql += f" WHERE {where_clause}"
sql += f" LIMIT {limit}"

await cursor.execute(sql)
results = await cursor.fetchall()

if not results:
return "未找到数据"

# 格式化结果
formatted_results = []
for row in results:
formatted_results.append(dict(row))

return json.dumps(formatted_results, ensure_ascii=False, indent=2, default=str)

except Exception as e:
error_msg = f"查询数据失败: {str(e)}"
logger.error(error_msg)
return error_msg
finally:
if conn:
await mysql_manager.release_connection(conn)

@app.tool()
async def update_data(table_name: str, data: Dict[str, Any], where_clause: str) -> str:
"""
更新表中的数据

Args:
table_name: 表名
data: 要更新的数据,键值对形式
where_clause: WHERE条件子句

Returns:
更新结果消息
"""
conn = None
try:
conn = await mysql_manager.get_connection()
async with conn.cursor() as cursor:
set_clauses = [f"`{key}` = %s" for key in data.keys()]
set_clause = ', '.join(set_clauses)
values = list(data.values())

sql = f"UPDATE `{table_name}` SET {set_clause} WHERE {where_clause}"
await cursor.execute(sql, values)
result_msg = f"数据更新成功,受影响行数: {cursor.rowcount}"
logger.info(result_msg)
return result_msg

except Exception as e:
error_msg = f"更新数据失败: {str(e)}"
logger.error(error_msg)
return error_msg
finally:
if conn:
await mysql_manager.release_connection(conn)

@app.tool()
async def delete_data(table_name: str, where_clause: str) -> str:
"""
删除表中的数据

Args:
table_name: 表名
where_clause: WHERE条件子句

Returns:
删除结果消息
"""
conn = None
try:
conn = await mysql_manager.get_connection()
async with conn.cursor() as cursor:
sql = f"DELETE FROM `{table_name}` WHERE {where_clause}"
await cursor.execute(sql)
result_msg = f"数据删除成功,受影响行数: {cursor.rowcount}"
logger.info(result_msg)
return result_msg

except Exception as e:
error_msg = f"删除数据失败: {str(e)}"
logger.error(error_msg)
return error_msg
finally:
if conn:
await mysql_manager.release_connection(conn)

@app.tool()
async def show_tables() -> str:
"""
显示所有数据表

Returns:
表列表字符串
"""
conn = None
try:
conn = await mysql_manager.get_connection()
async with conn.cursor() as cursor:
await cursor.execute("SHOW TABLES")
tables = await cursor.fetchall()

if not tables:
return "数据库中没有表"

table_list = [table[0] for table in tables]
return f"数据库中的表({len(table_list)}个):\n" + '\n'.join([f"- {table}" for table in table_list])

except Exception as e:
error_msg = f"显示表失败: {str(e)}"
logger.error(error_msg)
return error_msg
finally:
if conn:
await mysql_manager.release_connection(conn)

@app.tool()
async def get_database_info() -> str:
"""
获取当前数据库连接信息和状态

Returns:
数据库信息
"""
conn = None
try:
conn = await mysql_manager.get_connection()
async with conn.cursor() as cursor:
# 获取数据库版本
await cursor.execute("SELECT VERSION()")
version = await cursor.fetchone()

# 获取当前数据库
await cursor.execute("SELECT DATABASE()")
current_db = await cursor.fetchone()

# 获取连接信息
config_info = mysql_manager.get_config_info()
info = {
**config_info,
"mysql_version": version[0] if version else "Unknown",
"current_database": current_db[0] if current_db else "None",
"connection_status": "Connected"
}

return json.dumps(info, ensure_ascii=False, indent=2)

except Exception as e:
error_msg = f"获取数据库信息失败: {str(e)}"
logger.error(error_msg)
return error_msg
finally:
if conn:
await mysql_manager.release_connection(conn)

async def test_connection():
"""测试数据库连接"""
try:
logger.info("正在测试数据库连接...")
conn = await mysql_manager.get_connection()
async with conn.cursor() as cursor:
await cursor.execute("SELECT 1")
result = await cursor.fetchone()
if result[0] == 1:
logger.info("数据库连接测试成功")
return True
await mysql_manager.release_connection(conn)
except Exception as e:
logger.error(f"数据库连接测试失败: {e}")
return False
return False

def parse_arguments():
"""解析命令行参数,如果未提供则从环境变量中读取"""
parser = argparse.ArgumentParser(description='MySQL MCP Server')
parser.add_argument('--host', type=str, help='MySQL host (default: localhost)')
parser.add_argument('--port', type=int, help='MySQL port (default: 3306)')
parser.add_argument('--user', type=str, help='MySQL user (default: root)')
parser.add_argument('--password', type=str, help='MySQL password')
parser.add_argument('--database', type=str, help='MySQL database name (default: test01)')

args = parser.parse_args()

# 如果命令行参数未提供,则从环境变量中读取
if args.host is None:
args.host = os.getenv('MYSQL_HOST')
if args.port is None:
port_env = os.getenv('MYSQL_PORT')
if port_env:
args.port = int(port_env)
if args.user is None:
args.user = os.getenv('MYSQL_USER')
if args.password is None:
args.password = os.getenv('MYSQL_PASSWORD')
if args.database is None:
args.database = os.getenv('MYSQL_DATABASE')

return args

async def main():
"""主函数"""
global mysql_manager

try:
logger.info("MySQL MCP Server 启动中...")

# 解析命令行参数
args = parse_arguments()

# 初始化MySQL管理器
mysql_manager = MySQLManager(
host=args.host,
port=args.port,
user=args.user,
password=args.password,
database=args.database
)

# 显示配置信息
config_info = mysql_manager.get_config_info()
logger.info(f"数据库配置: {config_info}")

# 测试数据库连接
if not await test_connection():
logger.error("数据库连接失败,服务器无法启动")
return

# 使用stdio服务器运行MCP应用
async with stdio_server() as (read_stream, write_stream):
logger.info("MCP服务器正在运行...")
await app.run(
read_stream,
write_stream,
app.create_initialization_options()
)

except KeyboardInterrupt:
logger.info("服务器收到中断信号,正在关闭...")
except Exception as e:
logger.error(f"服务器运行错误: {e}", exc_info=True)
finally:
# 清理资源
if mysql_manager:
await mysql_manager.close_pool()
logger.info("服务器已关闭")

if __name__ == "__main__":
try:
# 解析命令行参数并初始化MySQL管理器
args = parse_arguments()
mysql_manager = MySQLManager(
host=args.host,
port=args.port,
user=args.user,
password=args.password,
database=args.database
)

# 运行FastMCP应用
app.run(transport="stdio")
except Exception as e:
logger.error(f"程序启动失败: {e}", exc_info=True)
sys.exit(1)

2.2 在本地测/调试MCP服务

1
(manipulate_mysql) E:\Hdisk\学习\06_AI学习\13_LatestTechKnowledge\01_MCP\try01\manipulate_mysql>mcp dev main.py
image-20250525211855439

本地浏览器访问http://127.0.0.1:6274,看到如下界面,根据实际情况修改Arguments的值

image-20250525212002117
image-20250525212217581

2.3 cursor中配置自定义MCP服务端

修改mcp.json文件:

image-20250525212453988

image-20250525212540322

2.4 cursor中使用自定义MCP服务端

image-20250525212845801

image-20250525213213756

image-20250525213302763

2.5 将此自定义mcp服务部署到阿里云函数计算平台

打开阿里云函数计算概览界面: https://fcnext.console.aliyun.com/overview ,点击左侧的“FunctionAI” 功能按钮,进入“云原生应用开发平台 CAP 控制台”页面,然后点击此页面左侧的“MCP广场”,进入“MCP广场|FunctionAI”页面后,点击右上角的“创建自定义 MCP Server”按钮。

  • 创建自定义MCP Server:
image-20250525143522255
  • 新建服务:
image-20250525213526806

2.5.1 基础配置

image-20250525214458965

2.5.2 MCP服务配置

image-20250525215030927

其中manipulate_mysql.zip是将main.py打包压缩成一个zip文件,在前面基础做如下修改:运行FastMCP应用的相关语句的参数"transport"修改成"sse",并指定主机IP与端口默认值分别 为0.0.0.0、9000。如下是修改后的main.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
#!/usr/bin/env python3
"""
MySQL MCP Server - 配置优化版本
提供MySQL数据库的创建表、增删改查等操作功能
支持通过环境变量和命令行参数配置数据库连接
"""

import asyncio
import json
import logging
import sys
import os
import argparse
from typing import Any, Dict, List, Optional

import aiomysql
from mcp.server.fastmcp import FastMCP
from mcp.server.stdio import stdio_server

# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stderr) # 输出到stderr避免与MCP通信冲突
]
)
logger = logging.getLogger(__name__)

## 从环境变量获取端口,如果没有则使用默认端口
port = int(os.environ.get('FC_SERVER_PORT', '9000'))
host = os.environ.get('FC_SERVER_HOST', '0.0.0.0')
# 在创建FastMCP实例时传递host和port参数
app = FastMCP("mysql-mcp-server", host=host, port=port)
#app = FastMCP("mysql-mcp-server")

class MySQLManager:
def __init__(self, host=None, port=None, user=None, password=None, database=None):
"""
初始化MySQL管理器

Args:
host: 数据库主机地址
port: 数据库端口
user: 数据库用户名
password: 数据库密码
database: 数据库名称
"""
# 优先使用传入的参数,然后是环境变量,最后是默认值
self.db_config = {
'host': host or os.getenv('MYSQL_HOST', 'localhost'),
'port': int(port or os.getenv('MYSQL_PORT', 3306)),
'user': user or os.getenv('MYSQL_USER', 'root'),
'password': password or os.getenv('MYSQL_PASSWORD', 'root@123'),
'db': database or os.getenv('MYSQL_DATABASE', 'test01'),
'charset': 'utf8mb4',
'autocommit': True
}
self.pool = None
self._lock = asyncio.Lock()

# 验证必要的配置
if not self.db_config['password']:
logger.warning("数据库密码为空,请确保这是预期的配置")

def get_config_info(self):
"""获取配置信息(隐藏密码)"""
config_info = self.db_config.copy()
config_info['password'] = '*' * len(config_info['password']) if config_info['password'] else 'Empty'
return config_info

async def get_connection(self):
"""获取数据库连接"""
async with self._lock:
if not self.pool:
try:
logger.info(f"正在连接数据库: {self.db_config['host']}:{self.db_config['port']}")
self.pool = await aiomysql.create_pool(
host=self.db_config['host'],
port=self.db_config['port'],
user=self.db_config['user'],
password=self.db_config['password'],
db=self.db_config['db'],
charset=self.db_config['charset'],
autocommit=self.db_config['autocommit'],
minsize=1,
maxsize=5,
connect_timeout=10,
pool_recycle=3600
)
logger.info("数据库连接池创建成功")
except Exception as e:
logger.error(f"创建数据库连接池失败: {e}")
raise

try:
conn = await self.pool.acquire()
return conn
except Exception as e:
logger.error(f"获取数据库连接失败: {e}")
raise

async def release_connection(self, conn):
"""释放数据库连接"""
if conn and self.pool:
try:
await self.pool.release(conn)
except Exception as e:
logger.error(f"释放数据库连接失败: {e}")

async def close_pool(self):
"""关闭连接池"""
if self.pool:
try:
self.pool.close()
await self.pool.wait_closed()
self.pool = None
logger.info("数据库连接池已关闭")
except Exception as e:
logger.error(f"关闭连接池失败: {e}")

# 全局MySQL管理器实例(将在main函数中初始化)
mysql_manager = None

@app.tool()
async def create_table(table_name: str, columns: List[Dict[str, str]]) -> str:
"""
创建MySQL数据表

Args:
table_name: 表名
columns: 列定义数组,每个元素包含name(列名)、type(数据类型)、constraints(约束条件,可选)

Returns:
创建结果消息
"""
conn = None
try:
conn = await mysql_manager.get_connection()
async with conn.cursor() as cursor:
# 构建CREATE TABLE语句
column_defs = []
for col in columns:
col_def = f"`{col['name']}` {col['type']}"
if col.get('constraints'):
col_def += f" {col['constraints']}"
column_defs.append(col_def)

sql = f"CREATE TABLE IF NOT EXISTS `{table_name}` ({', '.join(column_defs)})"
await cursor.execute(sql)
logger.info(f"表 '{table_name}' 创建成功")
return f"表 '{table_name}' 创建成功"

except Exception as e:
error_msg = f"创建表失败: {str(e)}"
logger.error(error_msg)
return error_msg
finally:
if conn:
await mysql_manager.release_connection(conn)

@app.tool()
async def insert_data(table_name: str, data: Dict[str, Any]) -> str:
"""
向表中插入数据

Args:
table_name: 表名
data: 要插入的数据,键值对形式

Returns:
插入结果消息
"""
conn = None
try:
conn = await mysql_manager.get_connection()
async with conn.cursor() as cursor:
columns = list(data.keys())
values = list(data.values())
placeholders = ', '.join(['%s'] * len(values))
column_names = ', '.join([f"`{col}`" for col in columns])

sql = f"INSERT INTO `{table_name}` ({column_names}) VALUES ({placeholders})"
await cursor.execute(sql, values)
result_msg = f"数据插入成功,受影响行数: {cursor.rowcount}"
logger.info(result_msg)
return result_msg

except Exception as e:
error_msg = f"插入数据失败: {str(e)}"
logger.error(error_msg)
return error_msg
finally:
if conn:
await mysql_manager.release_connection(conn)

@app.tool()
async def select_data(table_name: str, columns: Optional[List[str]] = None,
where_clause: str = "", limit: int = 100) -> str:
"""
查询表中的数据

Args:
table_name: 表名
columns: 要查询的列名,默认为所有列
where_clause: WHERE条件子句
limit: 限制返回行数

Returns:
查询结果的JSON字符串
"""
if columns is None:
columns = ["*"]

conn = None
try:
conn = await mysql_manager.get_connection()
async with conn.cursor(aiomysql.DictCursor) as cursor:
if columns == ["*"]:
column_str = "*"
else:
column_str = ', '.join([f"`{col}`" for col in columns])

sql = f"SELECT {column_str} FROM `{table_name}`"
if where_clause:
sql += f" WHERE {where_clause}"
sql += f" LIMIT {limit}"

await cursor.execute(sql)
results = await cursor.fetchall()

if not results:
return "未找到数据"

# 格式化结果
formatted_results = []
for row in results:
formatted_results.append(dict(row))

return json.dumps(formatted_results, ensure_ascii=False, indent=2, default=str)

except Exception as e:
error_msg = f"查询数据失败: {str(e)}"
logger.error(error_msg)
return error_msg
finally:
if conn:
await mysql_manager.release_connection(conn)

@app.tool()
async def update_data(table_name: str, data: Dict[str, Any], where_clause: str) -> str:
"""
更新表中的数据

Args:
table_name: 表名
data: 要更新的数据,键值对形式
where_clause: WHERE条件子句

Returns:
更新结果消息
"""
conn = None
try:
conn = await mysql_manager.get_connection()
async with conn.cursor() as cursor:
set_clauses = [f"`{key}` = %s" for key in data.keys()]
set_clause = ', '.join(set_clauses)
values = list(data.values())

sql = f"UPDATE `{table_name}` SET {set_clause} WHERE {where_clause}"
await cursor.execute(sql, values)
result_msg = f"数据更新成功,受影响行数: {cursor.rowcount}"
logger.info(result_msg)
return result_msg

except Exception as e:
error_msg = f"更新数据失败: {str(e)}"
logger.error(error_msg)
return error_msg
finally:
if conn:
await mysql_manager.release_connection(conn)

@app.tool()
async def delete_data(table_name: str, where_clause: str) -> str:
"""
删除表中的数据

Args:
table_name: 表名
where_clause: WHERE条件子句

Returns:
删除结果消息
"""
conn = None
try:
conn = await mysql_manager.get_connection()
async with conn.cursor() as cursor:
sql = f"DELETE FROM `{table_name}` WHERE {where_clause}"
await cursor.execute(sql)
result_msg = f"数据删除成功,受影响行数: {cursor.rowcount}"
logger.info(result_msg)
return result_msg

except Exception as e:
error_msg = f"删除数据失败: {str(e)}"
logger.error(error_msg)
return error_msg
finally:
if conn:
await mysql_manager.release_connection(conn)

@app.tool()
async def show_tables() -> str:
"""
显示所有数据表

Returns:
表列表字符串
"""
conn = None
try:
conn = await mysql_manager.get_connection()
async with conn.cursor() as cursor:
await cursor.execute("SHOW TABLES")
tables = await cursor.fetchall()

if not tables:
return "数据库中没有表"

table_list = [table[0] for table in tables]
return f"数据库中的表({len(table_list)}个):\n" + '\n'.join([f"- {table}" for table in table_list])

except Exception as e:
error_msg = f"显示表失败: {str(e)}"
logger.error(error_msg)
return error_msg
finally:
if conn:
await mysql_manager.release_connection(conn)

@app.tool()
async def get_database_info() -> str:
"""
获取当前数据库连接信息和状态

Returns:
数据库信息
"""
conn = None
try:
conn = await mysql_manager.get_connection()
async with conn.cursor() as cursor:
# 获取数据库版本
await cursor.execute("SELECT VERSION()")
version = await cursor.fetchone()

# 获取当前数据库
await cursor.execute("SELECT DATABASE()")
current_db = await cursor.fetchone()

# 获取连接信息
config_info = mysql_manager.get_config_info()
info = {
**config_info,
"mysql_version": version[0] if version else "Unknown",
"current_database": current_db[0] if current_db else "None",
"connection_status": "Connected"
}

return json.dumps(info, ensure_ascii=False, indent=2)

except Exception as e:
error_msg = f"获取数据库信息失败: {str(e)}"
logger.error(error_msg)
return error_msg
finally:
if conn:
await mysql_manager.release_connection(conn)

async def test_connection():
"""测试数据库连接"""
try:
logger.info("正在测试数据库连接...")
conn = await mysql_manager.get_connection()
async with conn.cursor() as cursor:
await cursor.execute("SELECT 1")
result = await cursor.fetchone()
if result[0] == 1:
logger.info("数据库连接测试成功")
return True
await mysql_manager.release_connection(conn)
except Exception as e:
logger.error(f"数据库连接测试失败: {e}")
return False
return False

def parse_arguments():
"""解析命令行参数,如果未提供则从环境变量中读取"""
parser = argparse.ArgumentParser(description='MySQL MCP Server')
parser.add_argument('--host', type=str, help='MySQL host (default: localhost)')
parser.add_argument('--port', type=int, help='MySQL port (default: 3306)')
parser.add_argument('--user', type=str, help='MySQL user (default: root)')
parser.add_argument('--password', type=str, help='MySQL password')
parser.add_argument('--database', type=str, help='MySQL database name (default: test01)')

args = parser.parse_args()

# 如果命令行参数未提供,则从环境变量中读取
if args.host is None:
args.host = os.getenv('MYSQL_HOST')
if args.port is None:
port_env = os.getenv('MYSQL_PORT')
if port_env:
args.port = int(port_env)
if args.user is None:
args.user = os.getenv('MYSQL_USER')
if args.password is None:
args.password = os.getenv('MYSQL_PASSWORD')
if args.database is None:
args.database = os.getenv('MYSQL_DATABASE')

return args

async def main():
"""主函数"""
global mysql_manager

try:
logger.info("MySQL MCP Server 启动中...")

# 解析命令行参数
args = parse_arguments()

# 初始化MySQL管理器
mysql_manager = MySQLManager(
host=args.host,
port=args.port,
user=args.user,
password=args.password,
database=args.database
)

# 显示配置信息
config_info = mysql_manager.get_config_info()
logger.info(f"数据库配置: {config_info}")

# 测试数据库连接
if not await test_connection():
logger.error("数据库连接失败,服务器无法启动")
return

# 使用stdio服务器运行MCP应用
async with stdio_server() as (read_stream, write_stream):
logger.info("MCP服务器正在运行...")
await app.run(
read_stream,
write_stream,
app.create_initialization_options()
)

except KeyboardInterrupt:
logger.info("服务器收到中断信号,正在关闭...")
except Exception as e:
logger.error(f"服务器运行错误: {e}", exc_info=True)
finally:
# 清理资源
if mysql_manager:
await mysql_manager.close_pool()
logger.info("服务器已关闭")

if __name__ == "__main__":
try:
# 解析命令行参数并初始化MySQL管理器
args = parse_arguments()
mysql_manager = MySQLManager(
host=args.host,
port=args.port,
user=args.user,
password=args.password,
database=args.database
)

# 运行FastMCP应用
app.run(transport="sse")
except Exception as e:
logger.error(f"程序启动失败: {e}", exc_info=True)
sys.exit(1)

2.5.3 资源配置

image-20250525215301013

其余配置保持默认。然后点击“预览与部署”按钮以查看与执行部署:

image-20250525215350656

此时会报错,并提示如下信息:No module named 'aiomysql',打开 WebIDE中的Terminal,执行如下命令

1
pip install aiomysql mcp uvicorn -t .

保存后重新部署即可。

image-20250525230119824

2.5.4 服务变量

就是添加系统环境变量。分别是MySQL或MariaDB数据库的端口与IP(此处是做了外网映射后的值)

image-20250525232944864

2.5.5 服务测试

image-20250525233102049

2.6 MCP管理中创建自定义MCP 服务

image-20250525231011726
image-20250525231135510
image-20250525233130022

2.7 创建智能体应用

image-20250525231305016
image-20250525231412600

配置提示词:

image-20250525233834258

2.8 试用智能体包含MCP服务在内的能力

image-20250525234318728
image-20250525234355455
image-20250525234527656

2.9 发布应用

image-20250525234646826

大语言模型MCP学习05-将自定义mcp服务部署到百炼平台02
https://jiangsanyin.github.io/2025/05/25/大语言模型MCP学习05-将自定义mcp服务部署到百炼平台02/
作者
sanyinjiang
发布于
2025年5月25日
许可协议