Development Guide
Guides actual development methods and implementation examples of plugins integrated with BioStar X.
Required endpoint Implementation
To develop a plugin integrated with BioStar X, you must implement the following two endpoints. This document uses /bsx as the example path.
Plugin registration endpoint
Affects version: GET /bsx
This is the endpoint called to check server accessibility by the BioStar X Service Manager when registering the plugin.
Request format
GET /bsx
Headers:
X-BioStar-Ping: <ping_token>
Implementation requirements
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
app = FastAPI()
@app.get("/bsx")
async def bsx_ping(request: Request):
"""Ping endpoint for BioStar X plugin registration"""
ping_token = request.headers.get("x-biostar-ping")
# Return ping token in response
response_data = {"token": ping_token}
response = JSONResponse(content=response_data)
return response
Response format
{
"token": "<ping_token_value>"
}
Testing
You can manually test the endpoint using the curl command.
curl -H "X-BioStar-Ping: test123" http://localhost:8000/bsx
User authentication callback endpoint
Affects version: POST /bsx
This is the endpoint called when the user clicks the plugin icon in the BioStar X web interface. In this process, the session of the logged-in user in BioStar X is requested and a response is received.
A certificate is required for secure communication between BioStar X and the plugin. The certificate can be downloaded from the details page of the plugin after registration. For more information, refer to the following.
Request format
POST /bsx
Content-Type: application/x-www-form-urlencoded
user_id={user_id}&plugin_id={plugin_id}
Implementation requirements
-
Parameter Validation: Check required values for
user_idandplugin_id -
AES Key Generation: Generate a 32-byte key for secure communication
-
Key Encryption: Encrypt the AES key with the RSA public key (assuming the filename is
public_key.pemin the example code) -
Session Bridge Request: Request session from the BioStar X server
-
Session Decryption: Decrypt the received encrypted session ID with AES
import uuid
import requests
from fastapi import HTTPException, Form
@app.post("/bsx")
async def bsx_callback(
request: Request,
user_id: str = Form(None, description="User ID"),
plugin_id: str = Form(None, description="Plugin ID")
):
"""BioStar X plugin callback processing"""
# 1. Validate required parameters
if not user_id:
raise HTTPException(status_code=400, detail="user_id is required")
if not plugin_id:
raise HTTPException(status_code=400, detail="plugin_id is required")
# 2. Generate AES key for secure communication
key = uuid.uuid4().hex # 32-byte key
# 3. Encrypt key with RSA public key
encrypted_key = encrypt_with_public_key(key, "public_key.pem")
# 4. Request session bridge to BioStar server
payload = {
"user_id": user_id,
"plugin_id": plugin_id,
"key": encrypted_key
}
response = requests.post(
f"https://{BIOSTAR_SERVER}/api/session/bridge",
json=payload,
verify=False
)
# 5. Process response
if response.status_code == 200:
data = response.json()
if data.get("Response", {}).get("code") == "0":
# Decrypt session ID
encrypted_session = data.get("bs_session_id")
bs_session_id = decrypt_aes256_base64(encrypted_session, key)
else:
# Handle BioStar X errors
error_msg = data.get("Response", {}).get("message", "unknown error")
raise HTTPException(status_code=400, detail=f"BioStar X error: {error_msg}")
else:
raise HTTPException(status_code=500, detail="Failed to connect to BioStar X server")
Testing
You can manually test the endpoint using the curl command.
curl -X POST http://localhost:8000/bsx \
-H "Content-Type: application/x-www-form-urlencoded" \
-d "user_id={user_id}&plugin_id={your_plugin_id}"
Encryption implementation
You must implement the following encryption functions.
RSA public key encryption
from cryptography.hazmat.primitives import serialization, padding
import base64
def encrypt_with_public_key(message: str, public_key_path: str) -> str:
"""Encrypt message with RSA public key"""
with open(public_key_path, 'rb') as key_file:
public_key = serialization.load_pem_public_key(key_file.read())
message_bytes = message.encode('utf-8')
encrypted = public_key.encrypt(message_bytes, padding.PKCS1v15())
return base64.b64encode(encrypted).decode('utf-8')
AES-256 decryption
from Crypto.Cipher import AES
from Crypto.Util.Padding import unpad
import base64
def decrypt_aes256_base64(encrypted_data: str, key: str) -> str:
"""Decrypt base64 data encrypted with AES256"""
key_bytes = key.encode('utf-8')
# Ensure 32-byte key
if len(key_bytes) != 32:
if len(key_bytes) < 32:
key_bytes = key_bytes.ljust(32, b'\0')
else:
key_bytes = key_bytes[:32]
cipher_text = base64.b64decode(encrypted_data)
iv = key_bytes[:16] # Use first 16 bytes as IV
cipher = AES.new(key_bytes, AES.MODE_CBC, iv)
decrypted = unpad(cipher.decrypt(cipher_text), AES.block_size)
return decrypted.decode('utf-8')