Skip to content

Commit d5e330b

Browse files
implement rate limit retries with tenacity
1 parent 32009f7 commit d5e330b

File tree

8 files changed

+367
-76
lines changed

8 files changed

+367
-76
lines changed

pyproject.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "oxylabs-ai-studio"
3-
version = "0.2.13"
3+
version = "0.2.14"
44
description = "Oxylabs studio python sdk"
55
readme = "README.md"
66
keywords = ["oxylabs", "ai", "studio"]
@@ -10,6 +10,7 @@ dependencies = [
1010
"pydantic>=2.0.0",
1111
"pydantic-settings>=2.9.1",
1212
"python-dotenv>=1.1.0",
13+
"tenacity>=9.1.2",
1314
]
1415

1516
[dependency-groups]
@@ -51,3 +52,4 @@ select = [
5152
"W", # pycodestyle warning
5253
"YTT", # wrong usage of sys.info
5354
]
55+
ignore = ["BLE001"]

src/oxylabs_ai_studio/apps/ai_crawler.py

Lines changed: 50 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from oxylabs_ai_studio.models import SchemaResponse
1010

1111
CRAWLER_TIMEOUT_SECONDS = 60 * 10
12-
POLL_INTERVAL_SECONDS = 3
12+
POLL_INTERVAL_SECONDS = 5
1313
POLL_MAX_ATTEMPTS = CRAWLER_TIMEOUT_SECONDS // POLL_INTERVAL_SECONDS
1414

1515
logger = get_logger(__name__)
@@ -51,7 +51,9 @@ def crawl(
5151
"geo_location": geo_location,
5252
}
5353
client = self.get_client()
54-
create_response = client.post(url="/extract/run", json=body)
54+
create_response = self.call_api(
55+
client=client, url="/extract/run", method="POST", body=body
56+
)
5557
if create_response.status_code != 200:
5658
raise Exception(
5759
f"Failed to create crawl job for {url}: {create_response.text}"
@@ -61,14 +63,22 @@ def crawl(
6163
logger.info(f"Starting crawl for url: {url}. Job id: {run_id}.")
6264
try:
6365
for _ in range(POLL_MAX_ATTEMPTS):
64-
get_response = client.get(
65-
"/extract/run/data", params={"run_id": run_id}
66-
)
66+
try:
67+
get_response = self.call_api(
68+
client=client,
69+
url="/extract/run/data",
70+
method="GET",
71+
params={"run_id": run_id},
72+
)
73+
except Exception:
74+
time.sleep(POLL_INTERVAL_SECONDS)
75+
continue
6776
if get_response.status_code == 202:
6877
time.sleep(POLL_INTERVAL_SECONDS)
6978
continue
7079
if get_response.status_code != 200:
71-
raise Exception(f"Failed to crawl {url}: {get_response.text}")
80+
time.sleep(POLL_INTERVAL_SECONDS)
81+
continue
7282
resp_body = get_response.json()
7383
if resp_body["status"] == "processing":
7484
time.sleep(POLL_INTERVAL_SECONDS)
@@ -80,7 +90,11 @@ def crawl(
8090
data=resp_body["data"],
8191
)
8292
if resp_body["status"] == "failed":
83-
raise Exception(f"Failed to crawl {url}.")
93+
return AiCrawlerJob(
94+
run_id=run_id,
95+
message=resp_body.get("error_code", None),
96+
data=None,
97+
)
8498
time.sleep(POLL_INTERVAL_SECONDS)
8599
except KeyboardInterrupt:
86100
logger.info("[Cancelled] Crawling was cancelled by user.")
@@ -90,7 +104,12 @@ def crawl(
90104
def generate_schema(self, prompt: str) -> dict[str, Any] | None:
91105
logger.info("Generating schema")
92106
body = {"user_prompt": prompt}
93-
response = self.get_client().post(url="/extract/generate-params", json=body)
107+
response = self.call_api(
108+
client=self.get_client(),
109+
url="/extract/generate-params",
110+
method="POST",
111+
body=body,
112+
)
94113
if response.status_code != 200:
95114
raise Exception(f"Failed to generate schema: {response.text}")
96115
json_response: SchemaResponse = response.json()
@@ -121,7 +140,9 @@ async def crawl_async(
121140
"geo_location": geo_location,
122141
}
123142
async with self.async_client() as client:
124-
create_response = await client.post(url="/extract/run", json=body)
143+
create_response = await self.call_api_async(
144+
client=client, url="/extract/run", method="POST", body=body
145+
)
125146
if create_response.status_code != 200:
126147
raise Exception(
127148
f"Failed to create crawl job for {url}: {create_response.text}"
@@ -131,14 +152,22 @@ async def crawl_async(
131152
logger.info(f"Starting async crawl for url: {url}. Job id: {run_id}.")
132153
try:
133154
for _ in range(POLL_MAX_ATTEMPTS):
134-
get_response = await client.get(
135-
"/extract/run/data", params={"run_id": run_id}
136-
)
155+
try:
156+
get_response = await self.call_api_async(
157+
client=client,
158+
url="/extract/run/data",
159+
method="GET",
160+
params={"run_id": run_id},
161+
)
162+
except Exception:
163+
await asyncio.sleep(POLL_INTERVAL_SECONDS)
164+
continue
137165
if get_response.status_code == 202:
138166
await asyncio.sleep(POLL_INTERVAL_SECONDS)
139167
continue
140168
if get_response.status_code != 200:
141-
raise Exception(f"Failed to crawl {url}: {get_response.text}")
169+
await asyncio.sleep(POLL_INTERVAL_SECONDS)
170+
continue
142171
resp_body = get_response.json()
143172
if resp_body["status"] == "processing":
144173
await asyncio.sleep(POLL_INTERVAL_SECONDS)
@@ -150,7 +179,11 @@ async def crawl_async(
150179
data=resp_body["data"],
151180
)
152181
if resp_body["status"] == "failed":
153-
raise Exception(f"Failed to crawl {url}.")
182+
return AiCrawlerJob(
183+
run_id=run_id,
184+
message=resp_body.get("error_code", None),
185+
data=None,
186+
)
154187
await asyncio.sleep(POLL_INTERVAL_SECONDS)
155188
except KeyboardInterrupt:
156189
logger.info("[Cancelled] Crawling was cancelled by user.")
@@ -162,7 +195,9 @@ async def generate_schema_async(self, prompt: str) -> dict[str, Any] | None:
162195
logger.info("Generating schema (async)")
163196
body = {"user_prompt": prompt}
164197
async with self.async_client() as client:
165-
response = await client.post(url="/extract/generate-params", json=body)
198+
response = await self.call_api_async(
199+
client=client, url="/extract/generate-params", method="POST", body=body
200+
)
166201
if response.status_code != 200:
167202
raise Exception(f"Failed to generate schema: {response.text}")
168203
json_response: SchemaResponse = response.json()

src/oxylabs_ai_studio/apps/ai_map.py

Lines changed: 47 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from oxylabs_ai_studio.logger import get_logger
1010

1111
MAP_TIMEOUT_SECONDS = 60 * 5
12-
POLL_INTERVAL_SECONDS = 3
12+
POLL_INTERVAL_SECONDS = 5
1313
POLL_MAX_ATTEMPTS = MAP_TIMEOUT_SECONDS // POLL_INTERVAL_SECONDS
1414

1515
logger = get_logger(__name__)
@@ -43,7 +43,9 @@ def map(
4343
"render_html": render_javascript,
4444
}
4545
client = self.get_client()
46-
create_response = client.post(url="/map", json=body)
46+
create_response = self.call_api(
47+
client=client, url="/map", method="POST", body=body
48+
)
4749
if create_response.status_code != 200:
4850
raise Exception(
4951
f"Failed to create map job for {url}: {create_response.text}"
@@ -52,9 +54,19 @@ def map(
5254
run_id = resp_body["run_id"]
5355
try:
5456
for _ in range(POLL_MAX_ATTEMPTS):
55-
get_response = client.get("/map/run", params={"run_id": run_id})
57+
try:
58+
get_response = self.call_api(
59+
client=client,
60+
url="/map/run",
61+
method="GET",
62+
params={"run_id": run_id},
63+
)
64+
except Exception:
65+
time.sleep(POLL_INTERVAL_SECONDS)
66+
continue
5667
if get_response.status_code != 200:
57-
raise Exception(f"Failed to map {url}: {get_response.text}")
68+
time.sleep(POLL_INTERVAL_SECONDS)
69+
continue
5870
resp_body = get_response.json()
5971
if resp_body["status"] == "completed":
6072
return AiMapJob(
@@ -63,7 +75,11 @@ def map(
6375
data=self._get_data(client=client, run_id=run_id),
6476
)
6577
if resp_body["status"] == "failed":
66-
raise Exception(f"Failed to map {url}.")
78+
return AiMapJob(
79+
run_id=run_id,
80+
message=resp_body.get("error_code", None),
81+
data=None,
82+
)
6783
time.sleep(POLL_INTERVAL_SECONDS)
6884
except KeyboardInterrupt:
6985
logger.info("[Cancelled] Mapping was cancelled by user.")
@@ -73,7 +89,9 @@ def map(
7389
raise TimeoutError(f"Failed to map {url}: timeout.")
7490

7591
def _get_data(self, client: httpx.Client, run_id: str) -> dict[str, Any]:
76-
get_response = client.get("/map/run/data", params={"run_id": run_id})
92+
get_response = self.call_api(
93+
client=client, url="/map/run/data", method="GET", params={"run_id": run_id}
94+
)
7795
if get_response.status_code != 200:
7896
raise Exception(f"Failed to get data for run {run_id}: {get_response.text}")
7997
return get_response.json().get("data", {}) or {}
@@ -94,7 +112,9 @@ async def map_async(
94112
"render_html": render_javascript,
95113
}
96114
async with self.async_client() as client:
97-
create_response = await client.post(url="/map", json=body)
115+
create_response = await self.call_api_async(
116+
client=client, url="/map", method="POST", body=body
117+
)
98118
if create_response.status_code != 200:
99119
raise Exception(
100120
f"Failed to create map job for {url}: {create_response.text}"
@@ -103,11 +123,19 @@ async def map_async(
103123
run_id = resp_body["run_id"]
104124
try:
105125
for _ in range(POLL_MAX_ATTEMPTS):
106-
get_response = await client.get(
107-
"/map/run", params={"run_id": run_id}
108-
)
126+
try:
127+
get_response = await self.call_api_async(
128+
client=client,
129+
url="/map/run",
130+
method="GET",
131+
params={"run_id": run_id},
132+
)
133+
except Exception:
134+
await asyncio.sleep(POLL_INTERVAL_SECONDS)
135+
continue
109136
if get_response.status_code != 200:
110-
raise Exception(f"Failed to map {url}: {get_response.text}")
137+
await asyncio.sleep(POLL_INTERVAL_SECONDS)
138+
continue
111139
resp_body = get_response.json()
112140
if resp_body["status"] == "completed":
113141
data = await self.get_data_async(client, run_id=run_id)
@@ -117,7 +145,11 @@ async def map_async(
117145
data=data,
118146
)
119147
if resp_body["status"] == "failed":
120-
raise Exception(f"Failed to map {url}.")
148+
return AiMapJob(
149+
run_id=run_id,
150+
message=resp_body.get("error_code", None),
151+
data=None,
152+
)
121153
await asyncio.sleep(POLL_INTERVAL_SECONDS)
122154
except KeyboardInterrupt:
123155
logger.info("[Cancelled] Mapping was cancelled by user.")
@@ -129,7 +161,9 @@ async def map_async(
129161
async def get_data_async(
130162
self, client: httpx.AsyncClient, run_id: str
131163
) -> dict[str, Any]:
132-
get_response = await client.get("/map/run/data", params={"run_id": run_id})
164+
get_response = await self.call_api_async(
165+
client=client, url="/map/run/data", method="GET", params={"run_id": run_id}
166+
)
133167
if get_response.status_code != 200:
134168
raise Exception(f"Failed to get data for run {run_id}: {get_response.text}")
135169
return get_response.json().get("data", {}) or {}

0 commit comments

Comments
 (0)