HTTP Client
aio-fluid provides async HTTP client wrappers around aiohttp and httpx with a unified interface for making requests, handling errors, and monitoring calls.
from fluid.utils.http_client import AioHttpClient, HttpxClient
Both clients implement the same HttpClient base class, so they can be used interchangeably.
Usage
async with AioHttpClient() as client:
data = await client.get("https://api.example.com/items")
async with HttpxClient() as client:
data = await client.post("https://api.example.com/items", json={"name": "foo"})
Response
fluid.utils.http_client.HttpResponse
Bases: ABC
url
abstractmethod
property
status_code
abstractmethod
property
method
abstractmethod
property
json
abstractmethod
async
Source code in fluid/utils/http_client.py
| @abstractmethod
async def json(self) -> ResponseType: ...
|
text
abstractmethod
async
Source code in fluid/utils/http_client.py
| @abstractmethod
async def text(self) -> str: ...
|
bytes
abstractmethod
async
Source code in fluid/utils/http_client.py
| @abstractmethod
async def bytes(self) -> bytes: ...
|
fluid.utils.http_client.HttpResponseError
HttpResponseError(response, data)
Bases: RuntimeError
Source code in fluid/utils/http_client.py
| def __init__(self, response: HttpResponse, data: ResponseType) -> None:
self.response = response
self.data = {
"response": data,
"request_url": response.url,
"request_method": response.method,
"response_status": self.status_code,
}
|
response
instance-attribute
data
instance-attribute
data = {
"response": data,
"request_url": url,
"request_method": method,
"response_status": status_code,
}
Clients
fluid.utils.http_client.HttpClient
dataclass
HttpClient(
session=None,
content_type="application/json",
session_owner=False,
ResponseError=HttpResponseError,
ok_status=frozenset((200, 201)),
default_headers=(
lambda: {"user-agent": HTTP_USER_AGENT}
)(),
)
Bases: Generic[S, R], ABC
Base class for Http clients
session
class-attribute
instance-attribute
content_type
class-attribute
instance-attribute
content_type = 'application/json'
session_owner
class-attribute
instance-attribute
ResponseError
class-attribute
instance-attribute
ok_status
class-attribute
instance-attribute
ok_status = field(default=frozenset((200, 201)), repr=False)
default_headers = field(
default_factory=lambda: {"user-agent": HTTP_USER_AGENT}
)
new_session
abstractmethod
Source code in fluid/utils/http_client.py
| @abstractmethod
def new_session(self, **kwargs: Any) -> S: ...
|
new_response
abstractmethod
Source code in fluid/utils/http_client.py
| @abstractmethod
def new_response(self, response: R) -> GenericHttpResponse[R]: ...
|
close
abstractmethod
async
Source code in fluid/utils/http_client.py
| @abstractmethod
async def close(self) -> None: ...
|
get_session
Source code in fluid/utils/http_client.py
| def get_session(self) -> S:
if not self.session:
self.session_owner = True
self.session = self.new_session()
return self.session
|
get
async
Source code in fluid/utils/http_client.py
| async def get(self, url: str, **kwargs: Any) -> ResponseType:
return await self.request("GET", url, **kwargs)
|
patch
async
Source code in fluid/utils/http_client.py
| async def patch(self, url: str, **kwargs: Any) -> ResponseType:
return await self.request("PATCH", url, **kwargs)
|
post
async
Source code in fluid/utils/http_client.py
| async def post(self, url: str, **kwargs: Any) -> ResponseType:
return await self.request("POST", url, **kwargs)
|
put
async
Source code in fluid/utils/http_client.py
| async def put(self, url: str, **kwargs: Any) -> ResponseType:
return await self.request("PUT", url, **kwargs)
|
delete
async
Source code in fluid/utils/http_client.py
| async def delete(self, url: str, **kwargs: Any) -> ResponseType:
return await self.request("DELETE", url, **kwargs)
|
request
async
request(
method,
url,
*,
headers=None,
callback=None,
monitor_http=None,
**kw
)
Source code in fluid/utils/http_client.py
| async def request(
self,
method: str,
url: str,
*,
headers: dict | None = None,
callback: Callable | bool | None = None,
monitor_http: HttpPathFn | None = None,
**kw: Any,
) -> ResponseType:
session = self.get_session()
_headers = self.get_default_headers()
_headers.update(headers or ())
method = method or "GET"
start = time.monotonic()
inner: R = await session.request(
method,
url,
headers=_headers,
**kw,
) # type: ignore
response = self.new_response(inner)
if monitor_http:
monitor_http_call(
response,
time.monotonic() - start,
sanitization_fn=monitor_http,
)
if callback:
if callback is True:
return response
else:
return await callback(response)
if self.ok(response):
data = await self.response_data(response)
elif response.status_code == 204:
data = {}
else:
await self.response_error(response)
return data
|
ok
Source code in fluid/utils/http_client.py
| def ok(self, response: HttpResponse) -> bool:
return response.status_code in self.ok_status
|
Source code in fluid/utils/http_client.py
| def get_default_headers(self) -> dict[str, str]:
headers = self.default_headers.copy()
if self.content_type:
headers["accept"] = self.content_type
return headers
|
response_error
async
classmethod
Source code in fluid/utils/http_client.py
| @classmethod
async def response_error(cls, response: HttpResponse) -> None:
try:
data = await cls.response_data(response)
except Exception:
data = {"message": await response.text()}
raise cls.ResponseError(response, data)
|
response_data
async
classmethod
Source code in fluid/utils/http_client.py
| @classmethod
async def response_data(cls, response: HttpResponse) -> ResponseType:
if "text/csv" in response.headers["content-type"]:
return await response.text()
return await response.json()
|
fluid.utils.http_client.AioHttpClient
dataclass
AioHttpClient(
session=None,
content_type="application/json",
session_owner=False,
ResponseError=HttpResponseError,
ok_status=frozenset((200, 201)),
default_headers=(
lambda: {"user-agent": HTTP_USER_AGENT}
)(),
)
Bases: HttpClient[ClientSession, ClientResponse]
session
class-attribute
instance-attribute
content_type
class-attribute
instance-attribute
content_type = 'application/json'
session_owner
class-attribute
instance-attribute
ResponseError
class-attribute
instance-attribute
ok_status
class-attribute
instance-attribute
ok_status = field(default=frozenset((200, 201)), repr=False)
default_headers = field(
default_factory=lambda: {"user-agent": HTTP_USER_AGENT}
)
new_session
Source code in fluid/utils/http_client.py
| def new_session(self, **kwargs: Any) -> client.ClientSession:
return client.ClientSession(**kwargs)
|
new_response
Source code in fluid/utils/http_client.py
| def new_response(
self, response: client.ClientResponse
) -> GenericHttpResponse[client.ClientResponse]:
return AioHttpResponse(response)
|
close
async
Source code in fluid/utils/http_client.py
| async def close(self) -> None:
if self.session and self.session_owner:
await self.session.close()
self.session = None
|
get_session
Source code in fluid/utils/http_client.py
| def get_session(self) -> S:
if not self.session:
self.session_owner = True
self.session = self.new_session()
return self.session
|
get
async
Source code in fluid/utils/http_client.py
| async def get(self, url: str, **kwargs: Any) -> ResponseType:
return await self.request("GET", url, **kwargs)
|
patch
async
Source code in fluid/utils/http_client.py
| async def patch(self, url: str, **kwargs: Any) -> ResponseType:
return await self.request("PATCH", url, **kwargs)
|
post
async
Source code in fluid/utils/http_client.py
| async def post(self, url: str, **kwargs: Any) -> ResponseType:
return await self.request("POST", url, **kwargs)
|
put
async
Source code in fluid/utils/http_client.py
| async def put(self, url: str, **kwargs: Any) -> ResponseType:
return await self.request("PUT", url, **kwargs)
|
delete
async
Source code in fluid/utils/http_client.py
| async def delete(self, url: str, **kwargs: Any) -> ResponseType:
return await self.request("DELETE", url, **kwargs)
|
request
async
request(
method,
url,
*,
headers=None,
callback=None,
monitor_http=None,
**kw
)
Source code in fluid/utils/http_client.py
| async def request(
self,
method: str,
url: str,
*,
headers: dict | None = None,
callback: Callable | bool | None = None,
monitor_http: HttpPathFn | None = None,
**kw: Any,
) -> ResponseType:
session = self.get_session()
_headers = self.get_default_headers()
_headers.update(headers or ())
method = method or "GET"
start = time.monotonic()
inner: R = await session.request(
method,
url,
headers=_headers,
**kw,
) # type: ignore
response = self.new_response(inner)
if monitor_http:
monitor_http_call(
response,
time.monotonic() - start,
sanitization_fn=monitor_http,
)
if callback:
if callback is True:
return response
else:
return await callback(response)
if self.ok(response):
data = await self.response_data(response)
elif response.status_code == 204:
data = {}
else:
await self.response_error(response)
return data
|
ok
Source code in fluid/utils/http_client.py
| def ok(self, response: HttpResponse) -> bool:
return response.status_code in self.ok_status
|
Source code in fluid/utils/http_client.py
| def get_default_headers(self) -> dict[str, str]:
headers = self.default_headers.copy()
if self.content_type:
headers["accept"] = self.content_type
return headers
|
response_error
async
classmethod
Source code in fluid/utils/http_client.py
| @classmethod
async def response_error(cls, response: HttpResponse) -> None:
try:
data = await cls.response_data(response)
except Exception:
data = {"message": await response.text()}
raise cls.ResponseError(response, data)
|
response_data
async
classmethod
Source code in fluid/utils/http_client.py
| @classmethod
async def response_data(cls, response: HttpResponse) -> ResponseType:
if "text/csv" in response.headers["content-type"]:
return await response.text()
return await response.json()
|
fluid.utils.http_client.HttpxClient
dataclass
HttpxClient(
session=None,
content_type="application/json",
session_owner=False,
ResponseError=HttpResponseError,
ok_status=frozenset((200, 201)),
default_headers=(
lambda: {"user-agent": HTTP_USER_AGENT}
)(),
)
Bases: HttpClient[AsyncClient, Response]
session
class-attribute
instance-attribute
content_type
class-attribute
instance-attribute
content_type = 'application/json'
session_owner
class-attribute
instance-attribute
ResponseError
class-attribute
instance-attribute
ok_status
class-attribute
instance-attribute
ok_status = field(default=frozenset((200, 201)), repr=False)
default_headers = field(
default_factory=lambda: {"user-agent": HTTP_USER_AGENT}
)
new_session
Source code in fluid/utils/http_client.py
| def new_session(self, **kwargs: Any) -> httpx.AsyncClient:
return httpx.AsyncClient(**kwargs)
|
new_response
Source code in fluid/utils/http_client.py
| def new_response(
self, response: httpx.Response
) -> GenericHttpResponse[httpx.Response]:
return HttpxResponse(response)
|
close
async
Source code in fluid/utils/http_client.py
| async def close(self) -> None:
if self.session and self.session_owner:
await self.session.aclose()
self.session = None
|
get_session
Source code in fluid/utils/http_client.py
| def get_session(self) -> S:
if not self.session:
self.session_owner = True
self.session = self.new_session()
return self.session
|
get
async
Source code in fluid/utils/http_client.py
| async def get(self, url: str, **kwargs: Any) -> ResponseType:
return await self.request("GET", url, **kwargs)
|
patch
async
Source code in fluid/utils/http_client.py
| async def patch(self, url: str, **kwargs: Any) -> ResponseType:
return await self.request("PATCH", url, **kwargs)
|
post
async
Source code in fluid/utils/http_client.py
| async def post(self, url: str, **kwargs: Any) -> ResponseType:
return await self.request("POST", url, **kwargs)
|
put
async
Source code in fluid/utils/http_client.py
| async def put(self, url: str, **kwargs: Any) -> ResponseType:
return await self.request("PUT", url, **kwargs)
|
delete
async
Source code in fluid/utils/http_client.py
| async def delete(self, url: str, **kwargs: Any) -> ResponseType:
return await self.request("DELETE", url, **kwargs)
|
request
async
request(
method,
url,
*,
headers=None,
callback=None,
monitor_http=None,
**kw
)
Source code in fluid/utils/http_client.py
| async def request(
self,
method: str,
url: str,
*,
headers: dict | None = None,
callback: Callable | bool | None = None,
monitor_http: HttpPathFn | None = None,
**kw: Any,
) -> ResponseType:
session = self.get_session()
_headers = self.get_default_headers()
_headers.update(headers or ())
method = method or "GET"
start = time.monotonic()
inner: R = await session.request(
method,
url,
headers=_headers,
**kw,
) # type: ignore
response = self.new_response(inner)
if monitor_http:
monitor_http_call(
response,
time.monotonic() - start,
sanitization_fn=monitor_http,
)
if callback:
if callback is True:
return response
else:
return await callback(response)
if self.ok(response):
data = await self.response_data(response)
elif response.status_code == 204:
data = {}
else:
await self.response_error(response)
return data
|
ok
Source code in fluid/utils/http_client.py
| def ok(self, response: HttpResponse) -> bool:
return response.status_code in self.ok_status
|
Source code in fluid/utils/http_client.py
| def get_default_headers(self) -> dict[str, str]:
headers = self.default_headers.copy()
if self.content_type:
headers["accept"] = self.content_type
return headers
|
response_error
async
classmethod
Source code in fluid/utils/http_client.py
| @classmethod
async def response_error(cls, response: HttpResponse) -> None:
try:
data = await cls.response_data(response)
except Exception:
data = {"message": await response.text()}
raise cls.ResponseError(response, data)
|
response_data
async
classmethod
Source code in fluid/utils/http_client.py
| @classmethod
async def response_data(cls, response: HttpResponse) -> ResponseType:
if "text/csv" in response.headers["content-type"]:
return await response.text()
return await response.json()
|