Skip to content

Commit feac639

Browse files
feat(pluginsdk): support external HTTP credential plugins (#656)
Add a generic SDK + gRPC runtime so credential plugins can expose HTTP injection, dashboard secret slots, agent env pushdown, and OAuth flow metadata out of tree, keeping provider-specific logic out of core. * pluginsdk.CredentialDef can declare HTTP injection and return per-instance metadata (secret slots, env vars, OAuth, disambiguators) from Build, and may implement InjectHTTP to stamp request headers at proxy time. * The external-plugin adapter maps that metadata onto the existing secret-slot, env-pushdown, OAuth-flow, and HTTP-credential interfaces; the built-in HTTPS endpoint routes placeholder-matched requests through the plugin's InjectHTTP, applies the returned header mutations, and consumes returned redactions for audit samples. * Dynamic MCP OAuth is selected by the credential flow (dynamic_mcp) rather than provider-hostname checks; the auth-code exchange uses public-client PKCE and tolerates pasted callback URLs and reordered query parameters. * Plugin callbacks (Build, InjectHTTP, HandleConn, tunnel Open/Close/Dial) recover from panics, header mutations are validated, plugin-returned metadata shape is checked, and derived secrets are redacted from audit samples.
1 parent c0e0851 commit feac639

21 files changed

Lines changed: 3793 additions & 419 deletions

cmd/clawpatrol/extplugin_http_credential_test.go

Lines changed: 400 additions & 0 deletions
Large diffs are not rendered by default.

cmd/clawpatrol/integrations_test.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,13 @@ func TestFetchEnvPushdownErrors(t *testing.T) {
114114
// flat list.
115115
func TestEnvPushdownVarsServerDriven(t *testing.T) {
116116
prev := envPushdownGatewayFetcher
117+
prevDaemon := envPushdownDaemonFetcher
117118
envPushdownGatewayFetcher = fetchEnvPushdownFromGateway
118-
t.Cleanup(func() { envPushdownGatewayFetcher = prev })
119+
envPushdownDaemonFetcher = nil
120+
t.Cleanup(func() {
121+
envPushdownGatewayFetcher = prev
122+
envPushdownDaemonFetcher = prevDaemon
123+
})
119124

120125
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
121126
_ = json.NewEncoder(w).Encode(map[string]any{
@@ -162,8 +167,13 @@ func TestEnvPushdownVarsErrorReturnsCAOnly(t *testing.T) {
162167
// the host's running NE happens to have to say, instead of the
163168
// gateway-URL-missing error this test exercises.
164169
prev := envPushdownGatewayFetcher
170+
prevDaemon := envPushdownDaemonFetcher
165171
envPushdownGatewayFetcher = fetchEnvPushdownFromGateway
166-
t.Cleanup(func() { envPushdownGatewayFetcher = prev })
172+
envPushdownDaemonFetcher = nil
173+
t.Cleanup(func() {
174+
envPushdownGatewayFetcher = prev
175+
envPushdownDaemonFetcher = prevDaemon
176+
})
167177

168178
dir := t.TempDir()
169179
caPath := filepath.Join(dir, "ca.crt")

cmd/clawpatrol/main.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2417,8 +2417,16 @@ func (g *Gateway) mitmHTTPSWithCertHost(c net.Conn, host, certHost string, ep *c
24172417
}
24182418
case wantsHTTP:
24192419
reqBodySecretRedactions = appendCredentialSecretRedactions(reqBodySecretRedactions, sec)
2420+
// Match existing request-signing behavior: an injection failure is logged,
2421+
// then the request continues with the agent's placeholder. The upstream
2422+
// service should reject that placeholder without exposing gateway secrets.
24202423
if err := injector.InjectHTTP(req.Context(), req, sec); err != nil {
2421-
log.Printf("inject %s: %v", cc.Credential.Symbol.Name, err)
2424+
log.Printf("inject %s: %v; forwarding without injection", cc.Credential.Symbol.Name, err)
2425+
}
2426+
if rp, ok := injector.(runtime.HTTPCredentialRedactionProvider); ok {
2427+
for _, secret := range rp.ConsumeHTTPRedactions(req) {
2428+
reqBodySecretRedactions = appendCredentialSecretRedaction(reqBodySecretRedactions, secret)
2429+
}
24222430
}
24232431
}
24242432
if wantsWS && isWSUpgrade(req) {
@@ -2598,7 +2606,7 @@ func (g *Gateway) mitmHTTPSWithCertHost(c net.Conn, host, certHost string, ep *c
25982606
}
25992607
}
26002608
ev.Status = resp.StatusCode
2601-
ev.ReqHeaders = flatHeaders(req.Header)
2609+
ev.ReqHeaders = flatHeadersRedacted(req.Header, reqBodySecretRedactions)
26022610
ev.In = reqS.n
26032611
ev.Out = respS.n
26042612
ev.ReqSha = reqS.sha()

cmd/clawpatrol/oauth.go

Lines changed: 137 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -56,11 +56,12 @@ type oauthState struct {
5656
displayName string // human-readable name (e.g. github login)
5757
avatarURL string // dashboard pfp
5858
// clientID is the dynamically-registered OAuth client_id for flows
59-
// that use RFC 7591 (notion_mcp). Static-ClientID flows (github,
60-
// anthropic, codex) leave this empty and use cfg.ClientID. Persisted
61-
// in the credentials table alongside the tokens so refresh works
62-
// across gateway restarts.
59+
// that use RFC 7591 (notion_mcp/dynamic_mcp). Static-ClientID flows
60+
// (github, anthropic, codex) leave this empty and use cfg.ClientID.
61+
// Persisted in the credentials table alongside the tokens so refresh
62+
// works across gateway restarts.
6363
clientID string
64+
flow string
6465
db *sql.DB
6566
mu sync.Mutex
6667
}
@@ -218,8 +219,8 @@ func (r *OAuthRegistry) Set(ctx context.Context, id string, tok *oauth2.Token) e
218219

219220
// SetWithClient is Set + a dynamically registered client_id. Pass empty
220221
// clientID for static-ClientID flows; pass the per-credential client_id
221-
// for RFC 7591 dynamic registration flows (notion_mcp). The clientID is
222-
// stamped onto the in-memory state and persisted alongside the tokens so
222+
// for RFC 7591 dynamic registration flows (notion_mcp/dynamic_mcp). The
223+
// clientID is stamped onto the in-memory state and persisted alongside the tokens so
223224
// refresh continues to work after restart.
224225
//
225226
// The userinfo fetch (fetchOAuthProfile) runs OUTSIDE the registry lock
@@ -335,6 +336,7 @@ func newState(it *OAuthIntegration, db *sql.DB) *oauthState {
335336
header: header,
336337
prefix: prefix,
337338
id: it.ID,
339+
flow: it.Flow,
338340
db: db,
339341
}
340342
}
@@ -347,11 +349,13 @@ func (s *oauthState) setToken(tok *oauth2.Token) {
347349
// (returns "Invalid request format" otherwise). Stdlib oauth2
348350
// only sends form-urlencoded.
349351
base = &anthropicRefreshSource{cfg: s.cfg, current: tok}
350-
case isNotionMCPTokenURL(s.cfg.Endpoint.TokenURL):
351-
// Notion's MCP token endpoint refreshes via form-urlencoded body
352-
// and expects the dynamically registered client_id (no static
353-
// ClientSecret — PKCE-only public client).
354-
base = &notionMCPRefreshSource{cfg: s.cfg, current: tok}
352+
case s.flow == "dynamic_mcp" || s.flow == "notion_mcp":
353+
// Hosted MCP token endpoints refresh via form-urlencoded body and
354+
// expect the dynamically registered client_id (no static
355+
// ClientSecret — PKCE-only public client). The flow, not the
356+
// provider hostname, selects this behavior so external credential
357+
// plugins can supply their own MCP OAuth endpoints.
358+
base = &dynamicMCPRefreshSource{cfg: s.cfg, current: tok}
355359
default:
356360
base = s.cfg.TokenSource(context.Background(), tok)
357361
}
@@ -535,7 +539,7 @@ func (r *OAuthRegistry) loadFromDB() error {
535539
}
536540
s := newState(it, r.db)
537541
// Restore the dynamically-registered client_id BEFORE setToken
538-
// so the refresh source (notion_mcp) picks it up via s.cfg.
542+
// so dynamic MCP refresh sources pick it up via s.cfg.
539543
if clientID.Valid && clientID.String != "" {
540544
s.clientID = clientID.String
541545
s.cfg.ClientID = clientID.String
@@ -566,8 +570,9 @@ type oauthSession struct {
566570
id string
567571
created time.Time
568572
// dynClientID is the RFC 7591 client_id this session registered at
569-
// start time (notion_mcp). Empty for static-ClientID flows. Stamped
570-
// onto the credential row at exchange time so refresh can replay it.
573+
// start time (dynamic MCP flows). Empty for static-ClientID flows.
574+
// Stamped onto the credential row at exchange time so refresh can
575+
// replay it.
571576
dynClientID string
572577
}
573578

@@ -646,8 +651,8 @@ func (w *webMux) apiOAuthStart(rw http.ResponseWriter, r *http.Request) {
646651
w.startOpenAIDeviceFlow(rw, r, id, flow)
647652
return
648653
}
649-
if flow.Flow == "notion_mcp" {
650-
w.startNotionMCPFlow(rw, r, id, flow)
654+
if flow.Flow == "notion_mcp" || flow.Flow == "dynamic_mcp" {
655+
w.startDynamicMCPFlow(rw, r, id, flow)
651656
return
652657
}
653658

@@ -677,14 +682,18 @@ func (w *webMux) apiOAuthStart(rw http.ResponseWriter, r *http.Request) {
677682
writeJSON(rw, map[string]string{"auth_url": authURL, "state": state})
678683
}
679684

680-
// startNotionMCPFlow drives the mcp.notion.com auth-code flow with RFC
681-
// 7591 dynamic client registration. Notion accepts arbitrary redirect
682-
// URIs at registration, so we register the dashboard's own
683-
// /oauth/callback page — the page auto-exchanges via /api/oauth/exchange
684-
// when it loads, with copy-paste from the URL bar as a fallback.
685-
func (w *webMux) startNotionMCPFlow(rw http.ResponseWriter, r *http.Request, id string, flow *OAuthIntegration) {
686-
redirectURI := w.dashboardRedirectURI(r, "/oauth/callback")
687-
clientID, err := registerOAuthClient(r.Context(), flow.OAuth.RegisterURL, redirectURI)
685+
// startDynamicMCPFlow drives auth-code OAuth flows with RFC 7591
686+
// dynamic client registration. Plugins may pin a provider-accepted
687+
// redirect URI (Amplitude uses a localhost loopback URI); otherwise we
688+
// register the dashboard's own /oauth/callback page, which
689+
// auto-exchanges via /api/oauth/exchange when it loads, with copy-paste
690+
// from the URL bar as a fallback.
691+
func (w *webMux) startDynamicMCPFlow(rw http.ResponseWriter, r *http.Request, id string, flow *OAuthIntegration) {
692+
redirectURI := strings.TrimSpace(flow.OAuth.RedirectURI)
693+
if redirectURI == "" {
694+
redirectURI = w.dashboardRedirectURI(r, "/oauth/callback")
695+
}
696+
clientID, err := registerOAuthClient(r.Context(), flow.OAuth.RegisterURL, redirectURI, flow.OAuth.Scopes)
688697
if err != nil {
689698
http.Error(rw, "dynamic client registration: "+err.Error(), http.StatusBadGateway)
690699
return
@@ -697,7 +706,11 @@ func (w *webMux) startNotionMCPFlow(rw http.ResponseWriter, r *http.Request, id
697706
ClientID: clientID,
698707
Scopes: flow.OAuth.Scopes,
699708
RedirectURL: redirectURI,
700-
Endpoint: oauth2.Endpoint{AuthURL: flow.OAuth.AuthURL, TokenURL: flow.OAuth.TokenURL},
709+
Endpoint: oauth2.Endpoint{
710+
AuthURL: flow.OAuth.AuthURL,
711+
TokenURL: flow.OAuth.TokenURL,
712+
AuthStyle: oauth2.AuthStyleInParams,
713+
},
701714
}
702715
authURL := cfg.AuthCodeURL(state,
703716
oauth2.SetAuthURLParam("code_challenge", challenge),
@@ -740,14 +753,18 @@ func (w *webMux) dashboardRedirectURI(r *http.Request, path string) string {
740753
// registerOAuthClient performs RFC 7591 dynamic client registration
741754
// against `registerURL`, asking for a public PKCE client bound to the
742755
// given redirect URI. Returns the issued client_id.
743-
func registerOAuthClient(ctx context.Context, registerURL, redirectURI string) (string, error) {
744-
body, _ := json.Marshal(map[string]any{
756+
func registerOAuthClient(ctx context.Context, registerURL, redirectURI string, scopes []string) (string, error) {
757+
bodyMap := map[string]any{
745758
"client_name": "clawpatrol",
746759
"redirect_uris": []string{redirectURI},
747760
"grant_types": []string{"authorization_code", "refresh_token"},
748761
"response_types": []string{"code"},
749762
"token_endpoint_auth_method": "none",
750-
})
763+
}
764+
if len(scopes) > 0 {
765+
bodyMap["scope"] = strings.Join(scopes, " ")
766+
}
767+
body, _ := json.Marshal(bodyMap)
751768
req, err := http.NewRequestWithContext(ctx, "POST", registerURL, bytes.NewReader(body))
752769
if err != nil {
753770
return "", err
@@ -775,6 +792,70 @@ func registerOAuthClient(ctx context.Context, registerURL, redirectURI string) (
775792
return rr.ClientID, nil
776793
}
777794

795+
func normalizeOAuthExchangeInput(input string) string {
796+
code, _ := parseOAuthExchangeInput(input)
797+
return code
798+
}
799+
800+
func parseOAuthExchangeInput(input string) (code, oauthErr string) {
801+
s := strings.TrimSpace(input)
802+
if s == "" {
803+
return "", ""
804+
}
805+
806+
// Operators often paste the whole callback URL after providers
807+
// redirect to a loopback URI (for example
808+
// localhost:8900/callback?code=...&state=...) or just its raw query
809+
// string, with parameters in any order. Try query-shaped inputs
810+
// first, then fall back to a bare opaque code.
811+
if code, oauthErr, ok := parseOAuthRawQuery(s); ok {
812+
return code, oauthErr
813+
}
814+
if strings.Contains(s, "?") {
815+
candidate := s
816+
if !strings.Contains(candidate, "://") && !strings.HasPrefix(candidate, "/") {
817+
// Browsers omit the scheme when the URL is copied from the
818+
// address bar; url.Parse needs one to find the query.
819+
candidate = "http://" + candidate
820+
}
821+
if u, err := url.Parse(candidate); err == nil {
822+
if code, oauthErr := oauthCodeOrError(u.Query()); code != "" || oauthErr != "" {
823+
return code, oauthErr
824+
}
825+
}
826+
}
827+
828+
// Otherwise treat the input as a bare code. Preserve '&' and '=' because
829+
// opaque provider codes may contain them; only strip fragment/query suffixes.
830+
if i := strings.IndexAny(s, "#?"); i > 0 {
831+
s = s[:i]
832+
}
833+
return strings.TrimSpace(s), ""
834+
}
835+
836+
func parseOAuthRawQuery(input string) (code, oauthErr string, ok bool) {
837+
raw := strings.TrimPrefix(input, "?")
838+
vals, err := url.ParseQuery(raw)
839+
if err != nil {
840+
return "", "", false
841+
}
842+
code, oauthErr = oauthCodeOrError(vals)
843+
return code, oauthErr, code != "" || oauthErr != ""
844+
}
845+
846+
func oauthCodeOrError(vals url.Values) (code, oauthErr string) {
847+
if code := strings.TrimSpace(vals.Get("code")); code != "" {
848+
return code, ""
849+
}
850+
if errCode := strings.TrimSpace(vals.Get("error")); errCode != "" {
851+
if desc := strings.TrimSpace(vals.Get("error_description")); desc != "" {
852+
return "", fmt.Sprintf("%s: %s", errCode, desc)
853+
}
854+
return "", errCode
855+
}
856+
return "", ""
857+
}
858+
778859
func (w *webMux) apiOAuthExchange(rw http.ResponseWriter, r *http.Request) {
779860
if r.Method != "POST" {
780861
http.Error(rw, "POST", http.StatusMethodNotAllowed)
@@ -788,14 +869,16 @@ func (w *webMux) apiOAuthExchange(rw http.ResponseWriter, r *http.Request) {
788869
http.Error(rw, err.Error(), 400)
789870
return
790871
}
791-
body.Code = strings.TrimSpace(body.Code)
872+
var oauthErr string
873+
body.Code, oauthErr = parseOAuthExchangeInput(body.Code)
874+
if oauthErr != "" {
875+
http.Error(rw, "oauth error: "+oauthErr, 400)
876+
return
877+
}
792878
if body.Code == "" || body.State == "" {
793879
http.Error(rw, "missing code/state", 400)
794880
return
795881
}
796-
if i := strings.IndexAny(body.Code, "#&?"); i > 0 {
797-
body.Code = body.Code[:i]
798-
}
799882

800883
w.mu.Lock()
801884
sess, ok := w.sessions[body.State]
@@ -1155,56 +1238,52 @@ func isAnthropicTokenURL(u string) bool {
11551238
return strings.Contains(u, "anthropic.com/")
11561239
}
11571240

1158-
func isNotionMCPTokenURL(u string) bool {
1159-
return strings.Contains(u, "mcp.notion.com/")
1160-
}
1161-
1162-
// notionMCPRefreshSource refreshes Notion MCP OAuth tokens via the
1241+
// dynamicMCPRefreshSource refreshes hosted MCP OAuth tokens via the
11631242
// form-urlencoded body the spec mandates. The client_id is read from
11641243
// cfg.ClientID, which the registry restores from the persisted
11651244
// credentials.client_id column on boot. Stateful: holds the current
11661245
// token (with refresh_token) and rotates it on each refresh.
1167-
type notionMCPRefreshSource struct {
1246+
type dynamicMCPRefreshSource struct {
11681247
mu sync.Mutex
11691248
cfg *oauth2.Config
11701249
current *oauth2.Token
11711250
}
11721251

1173-
func (n *notionMCPRefreshSource) Token() (*oauth2.Token, error) {
1174-
n.mu.Lock()
1175-
defer n.mu.Unlock()
1176-
if n.current.Valid() {
1177-
return n.current, nil
1252+
func (d *dynamicMCPRefreshSource) Token() (*oauth2.Token, error) {
1253+
d.mu.Lock()
1254+
defer d.mu.Unlock()
1255+
if d.current.Valid() {
1256+
return d.current, nil
11781257
}
1179-
if n.current.RefreshToken == "" {
1180-
return nil, fmt.Errorf("notion_mcp refresh: no refresh_token")
1258+
if d.current.RefreshToken == "" {
1259+
return nil, fmt.Errorf("dynamic_mcp refresh: no refresh_token")
11811260
}
1182-
if n.cfg.ClientID == "" {
1183-
return nil, fmt.Errorf("notion_mcp refresh: no client_id (dynamic registration was never persisted)")
1261+
if d.cfg.ClientID == "" {
1262+
return nil, fmt.Errorf("dynamic_mcp refresh: no client_id (dynamic registration was never persisted)")
11841263
}
11851264
form := url.Values{}
11861265
form.Set("grant_type", "refresh_token")
1187-
form.Set("refresh_token", n.current.RefreshToken)
1188-
form.Set("client_id", n.cfg.ClientID)
1266+
form.Set("refresh_token", d.current.RefreshToken)
1267+
form.Set("client_id", d.cfg.ClientID)
11891268
// See anthropicRefreshSource.Token: oauth2 has no ctx on Token(),
1190-
// so we bound the upstream round-trip here so n.mu stays available
1191-
// even if Notion's token endpoint hangs.
1269+
// so we bound the upstream round-trip here so d.mu stays available
1270+
// even if the MCP token endpoint hangs.
11921271
ctx, cancel := context.WithTimeout(context.Background(), oauthUpstreamTimeout)
11931272
defer cancel()
1194-
req, err := http.NewRequestWithContext(ctx, "POST", n.cfg.Endpoint.TokenURL, strings.NewReader(form.Encode()))
1273+
req, err := http.NewRequestWithContext(ctx, "POST", d.cfg.Endpoint.TokenURL, strings.NewReader(form.Encode()))
11951274
if err != nil {
1196-
return nil, fmt.Errorf("notion_mcp refresh: build request: %w", err)
1275+
return nil, fmt.Errorf("dynamic_mcp refresh: build request: %w", err)
11971276
}
11981277
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
11991278
req.Header.Set("Accept", "application/json")
12001279
resp, err := http.DefaultClient.Do(req)
12011280
if err != nil {
1202-
return nil, fmt.Errorf("notion_mcp refresh: %w", err)
1281+
return nil, fmt.Errorf("dynamic_mcp refresh: %w", err)
12031282
}
12041283
defer func() { _ = resp.Body.Close() }()
12051284
respBytes, _ := io.ReadAll(io.LimitReader(resp.Body, oauthResponseLimit))
12061285
if resp.StatusCode != 200 {
1207-
return nil, fmt.Errorf("notion_mcp refresh %d: %s", resp.StatusCode, string(respBytes))
1286+
return nil, fmt.Errorf("dynamic_mcp refresh %d: %s", resp.StatusCode, string(respBytes))
12081287
}
12091288
var tr struct {
12101289
AccessToken string `json:"access_token"`
@@ -1221,12 +1300,12 @@ func (n *notionMCPRefreshSource) Token() (*oauth2.Token, error) {
12211300
TokenType: tr.TokenType,
12221301
}
12231302
if t.RefreshToken == "" {
1224-
t.RefreshToken = n.current.RefreshToken
1303+
t.RefreshToken = d.current.RefreshToken
12251304
}
12261305
if tr.ExpiresIn > 0 {
12271306
t.Expiry = time.Now().Add(time.Duration(tr.ExpiresIn) * time.Second)
12281307
}
1229-
n.current = t
1308+
d.current = t
12301309
return t, nil
12311310
}
12321311

@@ -1319,7 +1398,7 @@ func (w *webMux) apiOAuthRevoke(rw http.ResponseWriter, r *http.Request) {
13191398

13201399
// oauthCallbackHTML is served at GET /oauth/callback for
13211400
// dynamic-registration flows that redirect back to the dashboard
1322-
// (notion_mcp). The inline JS extracts ?code & ?state, POSTs them to
1401+
// (notion_mcp/dynamic_mcp). The inline JS extracts ?code & ?state, POSTs them to
13231402
// /api/oauth/exchange so the original ConnectModal sees the credential
13241403
// connect itself, and shows the code prominently as a copy-paste
13251404
// fallback if the auto-exchange fails (e.g. dashboard secret expired,

0 commit comments

Comments
 (0)