generated from coulomb/repo-seed
Implement Topaz adapter
This commit is contained in:
159
internal/adapters/topaz/http_client.go
Normal file
159
internal/adapters/topaz/http_client.go
Normal file
@@ -0,0 +1,159 @@
|
||||
package topaz
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// HTTPClient talks to Topaz's REST gateways. It is the runnable development
|
||||
// client used by examples/topaz; production deployments can swap in a gRPC
|
||||
// client behind the same Client interface.
|
||||
type HTTPClient struct {
|
||||
DirectoryURL string
|
||||
HealthURL string
|
||||
Client *http.Client
|
||||
BundleSink BundleSink
|
||||
}
|
||||
|
||||
// NewHTTPClient creates an HTTP-backed Topaz client.
|
||||
func NewHTTPClient(directoryURL string, bundleSink BundleSink) (*HTTPClient, error) {
|
||||
if directoryURL == "" {
|
||||
return nil, fmt.Errorf("directory URL is required")
|
||||
}
|
||||
return &HTTPClient{
|
||||
DirectoryURL: strings.TrimRight(directoryURL, "/"),
|
||||
Client: &http.Client{Timeout: 10 * time.Second},
|
||||
BundleSink: bundleSink,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Check calls Topaz's directory check endpoint.
|
||||
func (c *HTTPClient) Check(ctx context.Context, request DirectoryCheckRequest) (CheckResult, error) {
|
||||
var response struct {
|
||||
Check bool `json:"check"`
|
||||
ETag string `json:"etag,omitempty"`
|
||||
}
|
||||
if err := c.postJSON(ctx, "/api/v3/directory/check", request, &response); err != nil {
|
||||
return CheckResult{}, NewBackendError(FailureUnavailable, "check", err)
|
||||
}
|
||||
return CheckResult{
|
||||
Allowed: response.Check,
|
||||
DirectoryETag: response.ETag,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// PutObject writes one directory object.
|
||||
func (c *HTTPClient) PutObject(ctx context.Context, object DirectoryObject) error {
|
||||
body := map[string]DirectoryObject{"object": object}
|
||||
if err := c.postJSON(ctx, "/api/v3/directory/object", body, nil); err != nil {
|
||||
return NewBackendError(FailureUnavailable, "put object", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// PutRelation writes one directory relation.
|
||||
func (c *HTTPClient) PutRelation(ctx context.Context, relation DirectoryRelation) (WriteResult, error) {
|
||||
body := map[string]DirectoryRelation{"relation": relation}
|
||||
var response struct {
|
||||
ETag string `json:"etag,omitempty"`
|
||||
}
|
||||
if err := c.postJSON(ctx, "/api/v3/directory/relation", body, &response); err != nil {
|
||||
return WriteResult{}, NewBackendError(FailureUnavailable, "put relation", err)
|
||||
}
|
||||
return WriteResult{ETag: response.ETag}, nil
|
||||
}
|
||||
|
||||
// PutManifest writes the directory manifest. Topaz versions have exposed both
|
||||
// /directory/manifest and /model, so the client tries the current endpoint and
|
||||
// falls back to the spike-compatible one.
|
||||
func (c *HTTPClient) PutManifest(ctx context.Context, manifest []byte) error {
|
||||
if err := c.postYAML(ctx, "/api/v3/directory/manifest", manifest); err == nil {
|
||||
return nil
|
||||
}
|
||||
if err := c.postYAML(ctx, "/api/v3/model", manifest); err != nil {
|
||||
return NewBackendError(FailureUnavailable, "put manifest", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// PutPolicyBundle delegates bundle publication to the configured sink.
|
||||
func (c *HTTPClient) PutPolicyBundle(ctx context.Context, bundle PolicyBundle) error {
|
||||
if c.BundleSink == nil {
|
||||
return NewBackendError(FailureUnsupported, "put policy bundle", fmt.Errorf("no bundle sink configured"))
|
||||
}
|
||||
return c.BundleSink.PutPolicyBundle(ctx, bundle)
|
||||
}
|
||||
|
||||
// Health checks Topaz readiness. If HealthURL is empty, it checks the
|
||||
// directory gateway root.
|
||||
func (c *HTTPClient) Health(ctx context.Context) error {
|
||||
url := c.HealthURL
|
||||
if url == "" {
|
||||
url = c.DirectoryURL
|
||||
}
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, strings.TrimRight(url, "/"), nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
resp, err := c.httpClient().Do(req)
|
||||
if err != nil {
|
||||
return NewBackendError(FailureUnavailable, "health", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode >= 500 {
|
||||
return NewBackendError(FailureUnavailable, "health", fmt.Errorf("status %s", resp.Status))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *HTTPClient) postJSON(ctx context.Context, path string, body any, out any) error {
|
||||
data, err := json.Marshal(body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return c.do(ctx, http.MethodPost, c.DirectoryURL+path, "application/json", data, out)
|
||||
}
|
||||
|
||||
func (c *HTTPClient) postYAML(ctx context.Context, path string, body []byte) error {
|
||||
return c.do(ctx, http.MethodPost, c.DirectoryURL+path, "application/yaml", body, nil)
|
||||
}
|
||||
|
||||
func (c *HTTPClient) do(ctx context.Context, method, url, contentType string, body []byte, out any) error {
|
||||
req, err := http.NewRequestWithContext(ctx, method, url, bytes.NewReader(body))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if contentType != "" {
|
||||
req.Header.Set("Content-Type", contentType)
|
||||
}
|
||||
resp, err := c.httpClient().Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
||||
responseBody, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
|
||||
return fmt.Errorf("status %s: %s", resp.Status, strings.TrimSpace(string(responseBody)))
|
||||
}
|
||||
if out == nil {
|
||||
io.Copy(io.Discard, resp.Body)
|
||||
return nil
|
||||
}
|
||||
if err := json.NewDecoder(resp.Body).Decode(out); err != nil && err != io.EOF {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *HTTPClient) httpClient() *http.Client {
|
||||
if c.Client != nil {
|
||||
return c.Client
|
||||
}
|
||||
return http.DefaultClient
|
||||
}
|
||||
Reference in New Issue
Block a user