BaseLinker API getOrders — kompletny tutorial Python
Jak pobierać zamówienia z BaseLinker API metodą getOrders. Paginacja, filtrowanie po statusie, rate limiting 30 req/min, async client z retry. Gotowy kod Python.
LiveSales
getOrders to najczęściej wywoływana metoda BaseLinker API. Oto jak ją używać bez wpadania w pułapki.
“Pobieranie zamówień z BaseLinker brzmi prosto — dopóki nie trafisz na limit 30 zapytań na minutę, paginację po 100 wynikach i brak dokumentacji o tym, co zwraca
include_custom_extra_fields. Ten tutorial to wszystko, czego szukałem 6 miesięcy temu.”
— Developer po 3 miesiącach integracji z BL API
BaseLinker API nie ma oficjalnego SDK dla Pythona. Dokumentacja jest skąpa, a społeczność mała. Ten tutorial daje ci gotowy, przetestowany kod do pobierania zamówień — od prostego requesta po async client z paginacją, rate limiterem i retry.
Gotowy kod Python
httpx async client, paginacja, error handling. Kopiuj i używaj.
Rate limiting 30/min
Sliding window, exponential backoff, auto-retry na 429.
Paginacja + enrichment
Pobieranie wszystkich zamówień + wzbogacanie o dane produktów.
Podstawy: jeden request do getOrders
Endpoint i format
BaseLinker API to jeden endpoint dla wszystkich metod:
POST https://api.baselinker.com/connector.php
Każde zapytanie ma te same pola form-data:
method— nazwa metody API (np.getOrders)parameters— JSON string z parametrami metody
Autoryzacja przez header X-BLToken z tokenem API (generujesz w panelu BaseLinker → Moje konto → API).
Minimalny przykład (requests)
import json
import requests
TOKEN = "twoj-token-baselinker"
API_URL = "https://api.baselinker.com/connector.php"
response = requests.post(
API_URL,
headers={"X-BLToken": TOKEN},
data={
"method": "getOrders",
"parameters": json.dumps({
"status_id": 234562, # ID statusu "Do pakowania"
}),
},
)
data = response.json()
if data.get("status") == "ERROR":
print(f"Błąd: {data['error_code']} — {data['error_message']}")
else:
orders = data.get("orders", [])
print(f"Pobrano {len(orders)} zamówień")
To pobiera max 100 zamówień z danego statusu. Jeśli masz więcej — musisz paginować.
Parametry getOrders — co możesz filtrować
| Parametr | Typ | Opis | Uwagi |
|---|---|---|---|
| status_id | int | ID statusu zamówienia | Znajdziesz w panelu BL → Zamówienia → Statusy |
| id_from | int | Pobierz zamówienia z ID > podanej wartości | Klucz do paginacji. Domyślnie 0. |
| date_from | int (unix) | Zamówienia od tej daty (UNIX timestamp) | Alternatywa dla id_from |
| get_unconfirmed_orders | bool | Czy pobierać nieopłacone zamówienia | Domyślnie false. Ustaw true dla zamówień za pobraniem. |
| include_custom_extra_fields | int (0/1) | Dołącz pola dodatkowe (extra fields) do response | Ustaw 1 jeśli masz custom fieldy (np. strefa, gabaryt). |
| filter_email | string | Filtruj po mailu kupującego | Dokładne dopasowanie |
| filter_order_source | string | Filtruj po źródle (np. “allegro”, “amazon”) | Przydatne przy wielu marketplace’ach |
Jak znaleźć status_id
W panelu BaseLinker: Zamówienia → Statusy. Każdy status ma numeryczne ID. Typowe:
- “Nowe zamówienie” — ID widoczne po najechaniu na status
- “Do pakowania” — np.
234562 - “Spakowane” — np.
234563 - “Wysłane” — np.
234564
ID statusów są unikalne per konto BaseLinker — nie kopiuj cudzych.
Paginacja — jak pobrać wszystkie zamówienia
getOrders zwraca max 100 zamówień per request. Jeśli masz 350 zamówień w statusie “Do pakowania”, musisz zrobić 4 requesty.
Mechanizm paginacji
BaseLinker paginuje po id_from — nie po numerze strony. Algorytm:
- Pierwszy request:
id_from = 0(lub pominięty) - Weź
max(order_id)z wyników - Następny request:
id_from = max_id - Powtarzaj aż dostaniesz pustą listę lub < 100 wyników
Gotowa funkcja z paginacją
import json
import httpx
API_URL = "https://api.baselinker.com/connector.php"
async def fetch_all_orders(
token: str,
status_id: int,
max_pages: int = 50,
) -> list[dict]:
"""Pobierz wszystkie zamówienia z danego statusu, z paginacją."""
all_orders = []
last_id = 0
async with httpx.AsyncClient(timeout=30.0) as client:
for page in range(max_pages):
resp = await client.post(
API_URL,
headers={"X-BLToken": token},
data={
"method": "getOrders",
"parameters": json.dumps({
"status_id": status_id,
"id_from": last_id,
"get_unconfirmed_orders": False,
"include_custom_extra_fields": 1,
}),
},
)
resp.raise_for_status()
data = resp.json()
if data.get("status") == "ERROR":
raise RuntimeError(
f"BL error: {data.get('error_code')} "
f"{data.get('error_message')}"
)
orders = data.get("orders", [])
if not orders:
break
all_orders.extend(orders)
new_last_id = max(o["order_id"] for o in orders)
if new_last_id == last_id:
break # Safety: no new orders
last_id = new_last_id
if len(orders) < 100:
break # Last page
return all_orders
Pułapka: paginacja bez warunku stopu
Bez max_pages lub sprawdzenia new_last_id == last_id możesz wpaść w nieskończoną pętlę. BaseLinker nie zwraca “next page” tokena — musisz sam wykryć koniec. Sprawdzaj len(orders) < 100 jako warunek ostatniej strony.
Struktura response — co dostajesz
Format odpowiedzi
{
"status": "SUCCESS",
"orders": [
{
"order_id": 123456789,
"shop_order_id": "0",
"external_order_id": "allegro-abc-123",
"order_source": "allegro",
"order_source_id": 1234,
"order_source_info": "Allegro",
"order_status_id": 234562,
"date_add": 1711900000,
"date_confirmed": 1711900100,
"date_in_status": 1711900200,
"user_login": "kupujacy@email.pl",
"phone": "600123456",
"email": "kupujacy@email.pl",
"user_comments": "Proszę o szybką wysyłkę",
"admin_comments": "",
"currency": "PLN",
"payment_method": "Płatność online",
"payment_method_cod": "0",
"payment_done": "49.99",
"delivery_method": "InPost Paczkomaty",
"delivery_price": "8.99",
"delivery_fullname": "Jan Kowalski",
"delivery_company": "",
"delivery_address": "ul. Testowa 1",
"delivery_city": "Warszawa",
"delivery_postcode": "00-001",
"delivery_country_code": "PL",
"delivery_point_id": "WAW01A",
"delivery_point_name": "Paczkomat WAW01A",
"invoice_fullname": "Jan Kowalski",
"invoice_nip": "",
"want_invoice": "0",
"extra_field_1": "",
"extra_field_2": "",
"products": [
{
"product_id": "98765",
"variant_id": "0",
"name": "Kabel YDYp 3x2.5 - 15m",
"sku": "YDYP-3x25-015",
"ean": "5901234567890",
"quantity": 1,
"price_brutto": 41.00,
"tax_rate": 23,
"weight": 2.5
}
]
}
]
}
Kluczowe pola
Daty (UNIX timestamp)
date_add— data złożenia zamówieniadate_confirmed— data potwierdzenia/opłaceniadate_in_status— kiedy zamówienie weszło w bieżący status
Konwersja: datetime.fromtimestamp(ts, tz=timezone.utc)
Produkty (tablica)
product_id— ID produktu w katalogu BLvariant_id— ID wariantu (0 = brak wariantów)sku— kod SKUean— kod kreskowy EANquantity— ilość sztukprice_brutto— cena brutto per sztuka
include_custom_extra_fields = 1
Bez tego parametru nie dostaniesz extra_field_* w response. Jeśli masz custom fieldy na zamówieniach (np. strefa magazynowa, gabaryt, uwagi wewnętrzne) — zawsze ustawiaj na 1. Nie zwiększa to czasu response.
Rate limiting — 30 zapytań na minutę
BaseLinker limituje do 30 zapytań per 60 sekund per konto (per token API). Przekroczenie daje HTTP 429 (“Too many requests”).
Problem
Przy paginacji 5 stron (500 zamówień) + enrichment produktów (5 stron po 100) masz 10 zapytań. Jeśli jednocześnie odpytujesz o stock, triggery, dokumenty — 30 req/min to za mało.
Rozwiązanie: sliding window rate limiter
import asyncio
import time
from collections import deque
class SlidingWindowRateLimiter:
"""Rate limiter: max N requests per window_seconds."""
def __init__(self, max_requests: int = 30, window_seconds: float = 60.0):
self._max = max_requests
self._window = window_seconds
self._timestamps: deque[float] = deque()
async def wait_and_acquire(self) -> float:
"""Wait until a slot is available, then acquire it.
Returns the time waited (0 if no wait was needed).
"""
waited = 0.0
while True:
now = time.monotonic()
# Remove expired timestamps
while self._timestamps and self._timestamps[0] <= now - self._window:
self._timestamps.popleft()
if len(self._timestamps) < self._max:
self._timestamps.append(now)
return waited
# Wait until the oldest request expires
sleep_time = self._timestamps[0] + self._window - now + 0.05
await asyncio.sleep(sleep_time)
waited += sleep_time
Użycie: przed każdym requestem do BL API, wywołaj await rate_limiter.wait_and_acquire(). Jeśli slot jest wolny — od razu przechodzi. Jeśli nie — czeka aż najstarszy request “wypadnie” z okna.
Auto-retry z exponential backoff
Jakie błędy retryować
| Błąd | Retry? | Dlaczego |
|---|---|---|
| HTTP 429 | TAK | Rate limit — poczekaj i ponów. Backoff: 1s → 2s → 4s. |
| HTTP 5xx | TAK | Server error po stronie BL — przejściowy problem. |
| Timeout | ZALEŻY | Retry tylko dla operacji krytycznych (triggery, stock). Dla odczytów — lepiej użyć cache. |
| HTTP 400/404 | NIE | Błąd logiczny — zły parametr, nieistniejący zasób. Retry nic nie zmieni. |
| status: “ERROR” | NIE | Błąd logiczny BL (np. zły token, brak uprawnień). Retryowanie maskuje bugi. |
Implementacja retry
RETRY_BACKOFF = [1.0, 2.0, 4.0] # sekundy między kolejnymi próbami
MAX_RETRIES = 3
async def bl_call_with_retry(
client: httpx.AsyncClient,
token: str,
method: str,
params: dict,
) -> dict:
"""BaseLinker API call with exponential backoff retry."""
for attempt in range(MAX_RETRIES + 1):
try:
resp = await client.post(
"https://api.baselinker.com/connector.php",
headers={"X-BLToken": token},
data={
"method": method,
"parameters": json.dumps(params),
},
)
# Retry on rate limit
if resp.status_code == 429:
if attempt < MAX_RETRIES:
await asyncio.sleep(RETRY_BACKOFF[attempt])
continue
resp.raise_for_status()
# Retry on server error
if resp.status_code >= 500:
if attempt < MAX_RETRIES:
await asyncio.sleep(RETRY_BACKOFF[attempt])
continue
resp.raise_for_status()
resp.raise_for_status()
data = resp.json()
# BL logical error — nie retryuj
if data.get("status") == "ERROR":
raise RuntimeError(
f"{data.get('error_code')}: {data.get('error_message')}"
)
return data
except httpx.TimeoutException:
if attempt < MAX_RETRIES:
await asyncio.sleep(RETRY_BACKOFF[attempt])
continue
raise
raise RuntimeError(f"Failed after {MAX_RETRIES} retries")
Dwa konta BaseLinker — scalanie zamówień
Jeśli masz dwa (lub więcej) konta BaseLinker (np. osobne sklepy Allegro), możesz pobierać zamówienia równolegle i scalać je w jeden strumień.
Parallel fetch z asyncio.gather
async def fetch_merged_orders(
token_a: str, status_a: int,
token_b: str, status_b: int,
) -> list[dict]:
"""Pobierz zamówienia z dwóch kont BL równolegle i scal."""
orders_a, orders_b = await asyncio.gather(
fetch_all_orders(token_a, status_a),
fetch_all_orders(token_b, status_b),
)
# Oznacz konto źródłowe
for o in orders_a:
o["_account"] = "A"
for o in orders_b:
o["_account"] = "B"
# Scal i posortuj po dacie potwierdzenia
merged = orders_a + orders_b
merged.sort(key=lambda o: o.get("date_confirmed", 0))
return merged
Kolumna _account jest kluczowa
Gdy odpalasz trigger na zamówieniu (runOrderMacroTrigger) lub zmieniasz status (setOrderStatus), musisz użyć tokena z właściwego konta. Pole _account mówi ci, którego klienta BL użyć. Bez tego — trigger pójdzie na złe konto i dostaniesz error “order not found”.
Każde konto BL ma osobny limit 30 req/min, więc parallel fetch nie zabiera slotów z jednego konta na drugie.
Enrichment — wzbogacanie zamówień o dane produktów
getOrders zwraca podstawowe dane produktów (nazwa, SKU, EAN, ilość, cena). Ale nie zwraca:
- Custom fields produktu (np. strefa magazynowa, waga brutto, opakowanie zbiorcze)
- Zdjęcia produktu (thumbnail)
- Informacja o bundlach (zestawy produktów)
Do tego potrzebujesz getInventoryProductsData:
async def enrich_with_product_details(
token: str,
inventory_id: int,
orders: list[dict],
field_strefa: str = "extra_field_15043",
) -> list[dict]:
"""Dodaj strefę i thumbnail do produktów w zamówieniach."""
# Zbierz unikalne product_id ze wszystkich zamówień
product_ids = set()
for order in orders:
for item in order.get("products", []):
pid = int(item.get("variant_id") or 0) or int(
item.get("product_id") or 0
)
if pid:
product_ids.add(pid)
if not product_ids:
return orders
# Pobierz szczegóły w batchach po 100
details = {}
async with httpx.AsyncClient(timeout=30.0) as client:
for i in range(0, len(list(product_ids)), 100):
batch = list(product_ids)[i : i + 100]
data = await bl_call_with_retry(
client, token, "getInventoryProductsData",
{"inventory_id": inventory_id, "products": batch},
)
for pid_str, pdata in data.get("products", {}).items():
details[pid_str] = pdata
# Wzbogać produkty w zamówieniach
for order in orders:
for item in order.get("products", []):
pid = str(
int(item.get("variant_id") or 0)
or int(item.get("product_id") or 0)
)
detail = details.get(pid, {})
text_fields = detail.get("text_fields", {})
item["strefa"] = text_fields.get(field_strefa, "")
images = detail.get("images", {})
if images:
first_key = sorted(images.keys())[0]
item["thumbnail_url"] = images[first_key]
item["is_bundle"] = bool(detail.get("is_bundle", 0))
return orders
Caching produktów
Produkty zmieniają się rzadko (raz na godzinę max). Zamówienia — co 5 minut. Nie odpytuj getInventoryProductsData przy każdym sync — cachuj per product_id z TTL 5 minut:
# Redis caching (uproszczony)
product_cache = {} # W produkcji: Redis z TTL
async def get_product_detail(token, inventory_id, pid):
cache_key = f"product:{pid}:detail"
if cache_key in product_cache:
return product_cache[cache_key]
# ... fetch from BL API ...
product_cache[cache_key] = detail # TTL 300s w Redis
return detail
Kompletny async client — gotowy do produkcji
Poniżej zebrany client łączący wszystko: paginację, rate limiting, retry, enrichment.
"""BaseLinker getOrders client — production ready."""
import asyncio
import json
import logging
import time
from collections import deque
from dataclasses import dataclass
import httpx
logger = logging.getLogger(__name__)
BL_API = "https://api.baselinker.com/connector.php"
RATE_LIMIT = 30
RATE_WINDOW = 60.0
MAX_RETRIES = 3
BACKOFF = [1.0, 2.0, 4.0]
class BaseLinkerError(Exception):
def __init__(self, message: str, code: str = ""):
super().__init__(message)
self.code = code
class RateLimiter:
def __init__(self):
self._ts: deque[float] = deque()
async def acquire(self):
while True:
now = time.monotonic()
while self._ts and self._ts[0] <= now - RATE_WINDOW:
self._ts.popleft()
if len(self._ts) < RATE_LIMIT:
self._ts.append(now)
return
wait = self._ts[0] + RATE_WINDOW - now + 0.05
await asyncio.sleep(wait)
@dataclass
class BLClient:
token: str
_rate: RateLimiter = None
_http: httpx.AsyncClient = None
async def start(self):
self._rate = RateLimiter()
self._http = httpx.AsyncClient(timeout=60.0)
async def stop(self):
if self._http:
await self._http.aclose()
async def call(self, method: str, params: dict) -> dict:
for attempt in range(MAX_RETRIES + 1):
await self._rate.acquire()
try:
resp = await self._http.post(
BL_API,
headers={"X-BLToken": self.token},
data={
"method": method,
"parameters": json.dumps(params),
},
)
if resp.status_code in (429,) or resp.status_code >= 500:
if attempt < MAX_RETRIES:
await asyncio.sleep(BACKOFF[attempt])
continue
resp.raise_for_status()
data = resp.json()
if data.get("status") == "ERROR":
raise BaseLinkerError(
data.get("error_message", ""),
data.get("error_code", ""),
)
return data
except httpx.TimeoutException:
if attempt < MAX_RETRIES:
await asyncio.sleep(BACKOFF[attempt])
continue
raise
raise RuntimeError("Max retries exceeded")
async def get_orders(self, status_id: int) -> list[dict]:
"""Pobierz wszystkie zamówienia z paginacją."""
all_orders, last_id = [], 0
for _ in range(50):
data = await self.call("getOrders", {
"status_id": status_id,
"id_from": last_id,
"get_unconfirmed_orders": False,
"include_custom_extra_fields": 1,
})
orders = data.get("orders", [])
if not orders:
break
all_orders.extend(orders)
new_last = max(o["order_id"] for o in orders)
if new_last == last_id or len(orders) < 100:
break
last_id = new_last
return all_orders
# Użycie:
async def main():
client = BLClient(token="twoj-token")
await client.start()
orders = await client.get_orders(status_id=234562)
print(f"Pobrano {len(orders)} zamówień")
for o in orders[:3]:
print(
f" #{o['order_id']} — {o['delivery_fullname']} "
f"— {len(o['products'])} produktów"
)
await client.stop()
if __name__ == "__main__":
asyncio.run(main())
Pułapki i rzeczy, które dokumentacja pomija
Timeout 60s to minimum
BaseLinker bywa wolny przy dużych batchach (500+ zamówień). Timeout 30s powoduje fałszywe faile. Ustawiaj minimum 60 sekund na request. Dla operacji stockowych — nawet 90s.
variant_id vs product_id
Jeśli produkt ma warianty, variant_id to prawdziwy identyfikator. product_id to parent. Przy wzbogacaniu danych — bierz variant_id jeśli != 0, inaczej product_id.
date_confirmed = 0
Zamówienia za pobraniem nie mają date_confirmed (jest 0). Sortuj po date_add jako fallback albo ustaw get_unconfirmed_orders: true.
Rate limit jest per token, nie per IP
Dwa serwery z tym samym tokenem dzielą limit 30/min. Jeśli masz staging + produkcja na tym samym tokenie — staging potrafi “zjadać” limity produkcji. Używaj osobnych tokenów.
Persistent connection pooling na produkcji
Nie twórz nowego httpx.AsyncClient per request — każdy nowy client = nowy TCP handshake + TLS negotiation. Użyj jednego clienta dla całej aplikacji (singleton). W naszym systemie jeden httpx.AsyncClient żyje przez cały czas życia aplikacji i reużywa połączeń TCP. Różnica: ~200ms na request mniej.
Podsumowanie i co dalej
Ten tutorial pokrywa getOrders — najczęściej używaną metodę. Ale BaseLinker API ma więcej metod, które współpracują z zamówieniami:
runOrderMacroTrigger— odpalanie makr (drukowanie etykiet, zmiana statusu)setOrderStatus— programowa zmiana statusu zamówieniagetInventoryProductsData— szczegóły produktów (custom fields, zdjęcia)getInventoryProductsStock— stany magazynoweaddInventoryDocument— dokumenty magazynowe (przyjęcia, wydania, korekty)
Jak te metody współpracują w prawdziwym systemie pakowania — opisaliśmy w case study systemu Pakowanie Kablowo, gdzie getOrders to punkt startowy całego flow: zamówienie → paginacja → enrichment → widok pakowania → trigger → etykieta → wysyłka.
Kod z tego tutoriala jest uproszczoną wersją klienta, którego używamy w produkcji — pełny client ma dodatkowo kolejkę priorytetową (CRITICAL > HIGH > NORMAL > LOW), connection pooling i WebSocket broadcasts.
Zainteresowany automatyzacją danych?
LiveSales pomoże Ci zaoszczędzić czas i podejmować lepsze decyzje biznesowe dzięki automatycznym raportom i dashboardom.
Skontaktuj się z namiPodobał Ci się ten artykuł?
Subskrybuj, aby dostawać powiadomienia o nowych artykułach.
Bez spamu. Możesz się wypisać w każdej chwili.
Przeczytaj również
Ekosystem narzędzi: gdy sprzedaż, magazyn i Allegro Ads rozmawiają
Jak połączyliśmy narzędzie Allegro Ads z magazynem i działem sprzedaży. Magazyn wie co się sprzedaje, sprzedaż wie co na stanie — lepsze decyzje.
Headless BaseLinker — automatyzacja bez płatnych modułów za 1000 zł
Jak wysyłamy requesty do frontendu BaseLinkera przez konsolę, omijając płatne moduły. Repricing i kontrola cen za darmo zamiast 12 000 PLN rocznie.
Własne narzędzie do Allegro Ads — dlaczego zbudowaliśmy je od zera
Jak wygląda nasze wewnętrzne narzędzie do Allegro Ads. Kampanie po kategoriach, rentowność per oferta, repricing, auto-odnowienia — pełny przegląd modułów.