BaseLinker API getOrders — kompletny tutorial Python

Jak pobierać zamówienia z BaseLinker API metodą getOrders. Paginacja, filtrowanie po statusie, rate limiting 30 req/min, async client z retry. Gotowy kod Python.

L

LiveSales

17 min czytania

getOrders to najczęściej wywoływana metoda BaseLinker API. Oto jak ją używać bez wpadania w pułapki.

“Pobieranie zamówień z BaseLinker brzmi prosto — dopóki nie trafisz na limit 30 zapytań na minutę, paginację po 100 wynikach i brak dokumentacji o tym, co zwraca include_custom_extra_fields. Ten tutorial to wszystko, czego szukałem 6 miesięcy temu.”
— Developer po 3 miesiącach integracji z BL API

BaseLinker API nie ma oficjalnego SDK dla Pythona. Dokumentacja jest skąpa, a społeczność mała. Ten tutorial daje ci gotowy, przetestowany kod do pobierania zamówień — od prostego requesta po async client z paginacją, rate limiterem i retry.

Gotowy kod Python

httpx async client, paginacja, error handling. Kopiuj i używaj.

Rate limiting 30/min

Sliding window, exponential backoff, auto-retry na 429.

Paginacja + enrichment

Pobieranie wszystkich zamówień + wzbogacanie o dane produktów.


Podstawy: jeden request do getOrders

Endpoint i format

BaseLinker API to jeden endpoint dla wszystkich metod:

POST https://api.baselinker.com/connector.php

Każde zapytanie ma te same pola form-data:

  • method — nazwa metody API (np. getOrders)
  • parameters — JSON string z parametrami metody

Autoryzacja przez header X-BLToken z tokenem API (generujesz w panelu BaseLinker → Moje konto → API).

Minimalny przykład (requests)

import json
import requests

TOKEN = "twoj-token-baselinker"
API_URL = "https://api.baselinker.com/connector.php"

response = requests.post(
    API_URL,
    headers={"X-BLToken": TOKEN},
    data={
        "method": "getOrders",
        "parameters": json.dumps({
            "status_id": 234562,  # ID statusu "Do pakowania"
        }),
    },
)
data = response.json()

if data.get("status") == "ERROR":
    print(f"Błąd: {data['error_code']}{data['error_message']}")
else:
    orders = data.get("orders", [])
    print(f"Pobrano {len(orders)} zamówień")

To pobiera max 100 zamówień z danego statusu. Jeśli masz więcej — musisz paginować.


Parametry getOrders — co możesz filtrować

ParametrTypOpisUwagi
status_idintID statusu zamówieniaZnajdziesz w panelu BL → Zamówienia → Statusy
id_fromintPobierz zamówienia z ID > podanej wartościKlucz do paginacji. Domyślnie 0.
date_fromint (unix)Zamówienia od tej daty (UNIX timestamp)Alternatywa dla id_from
get_unconfirmed_ordersboolCzy pobierać nieopłacone zamówieniaDomyślnie false. Ustaw true dla zamówień za pobraniem.
include_custom_extra_fieldsint (0/1)Dołącz pola dodatkowe (extra fields) do responseUstaw 1 jeśli masz custom fieldy (np. strefa, gabaryt).
filter_emailstringFiltruj po mailu kupującegoDokładne dopasowanie
filter_order_sourcestringFiltruj po źródle (np. “allegro”, “amazon”)Przydatne przy wielu marketplace’ach

Jak znaleźć status_id

W panelu BaseLinker: Zamówienia → Statusy. Każdy status ma numeryczne ID. Typowe:

  • “Nowe zamówienie” — ID widoczne po najechaniu na status
  • “Do pakowania” — np. 234562
  • “Spakowane” — np. 234563
  • “Wysłane” — np. 234564

ID statusów są unikalne per konto BaseLinker — nie kopiuj cudzych.


Paginacja — jak pobrać wszystkie zamówienia

getOrders zwraca max 100 zamówień per request. Jeśli masz 350 zamówień w statusie “Do pakowania”, musisz zrobić 4 requesty.

Mechanizm paginacji

BaseLinker paginuje po id_from — nie po numerze strony. Algorytm:

  1. Pierwszy request: id_from = 0 (lub pominięty)
  2. Weź max(order_id) z wyników
  3. Następny request: id_from = max_id
  4. Powtarzaj aż dostaniesz pustą listę lub < 100 wyników

Gotowa funkcja z paginacją

import json
import httpx

API_URL = "https://api.baselinker.com/connector.php"


async def fetch_all_orders(
    token: str,
    status_id: int,
    max_pages: int = 50,
) -> list[dict]:
    """Pobierz wszystkie zamówienia z danego statusu, z paginacją."""
    all_orders = []
    last_id = 0

    async with httpx.AsyncClient(timeout=30.0) as client:
        for page in range(max_pages):
            resp = await client.post(
                API_URL,
                headers={"X-BLToken": token},
                data={
                    "method": "getOrders",
                    "parameters": json.dumps({
                        "status_id": status_id,
                        "id_from": last_id,
                        "get_unconfirmed_orders": False,
                        "include_custom_extra_fields": 1,
                    }),
                },
            )
            resp.raise_for_status()
            data = resp.json()

            if data.get("status") == "ERROR":
                raise RuntimeError(
                    f"BL error: {data.get('error_code')} "
                    f"{data.get('error_message')}"
                )

            orders = data.get("orders", [])
            if not orders:
                break

            all_orders.extend(orders)

            new_last_id = max(o["order_id"] for o in orders)
            if new_last_id == last_id:
                break  # Safety: no new orders
            last_id = new_last_id

            if len(orders) < 100:
                break  # Last page

    return all_orders

Pułapka: paginacja bez warunku stopu

Bez max_pages lub sprawdzenia new_last_id == last_id możesz wpaść w nieskończoną pętlę. BaseLinker nie zwraca “next page” tokena — musisz sam wykryć koniec. Sprawdzaj len(orders) < 100 jako warunek ostatniej strony.


Struktura response — co dostajesz

Format odpowiedzi

{
  "status": "SUCCESS",
  "orders": [
    {
      "order_id": 123456789,
      "shop_order_id": "0",
      "external_order_id": "allegro-abc-123",
      "order_source": "allegro",
      "order_source_id": 1234,
      "order_source_info": "Allegro",
      "order_status_id": 234562,
      "date_add": 1711900000,
      "date_confirmed": 1711900100,
      "date_in_status": 1711900200,
      "user_login": "kupujacy@email.pl",
      "phone": "600123456",
      "email": "kupujacy@email.pl",
      "user_comments": "Proszę o szybką wysyłkę",
      "admin_comments": "",
      "currency": "PLN",
      "payment_method": "Płatność online",
      "payment_method_cod": "0",
      "payment_done": "49.99",
      "delivery_method": "InPost Paczkomaty",
      "delivery_price": "8.99",
      "delivery_fullname": "Jan Kowalski",
      "delivery_company": "",
      "delivery_address": "ul. Testowa 1",
      "delivery_city": "Warszawa",
      "delivery_postcode": "00-001",
      "delivery_country_code": "PL",
      "delivery_point_id": "WAW01A",
      "delivery_point_name": "Paczkomat WAW01A",
      "invoice_fullname": "Jan Kowalski",
      "invoice_nip": "",
      "want_invoice": "0",
      "extra_field_1": "",
      "extra_field_2": "",
      "products": [
        {
          "product_id": "98765",
          "variant_id": "0",
          "name": "Kabel YDYp 3x2.5 - 15m",
          "sku": "YDYP-3x25-015",
          "ean": "5901234567890",
          "quantity": 1,
          "price_brutto": 41.00,
          "tax_rate": 23,
          "weight": 2.5
        }
      ]
    }
  ]
}

Kluczowe pola

Daty (UNIX timestamp)
  • date_add — data złożenia zamówienia
  • date_confirmed — data potwierdzenia/opłacenia
  • date_in_status — kiedy zamówienie weszło w bieżący status

Konwersja: datetime.fromtimestamp(ts, tz=timezone.utc)

Produkty (tablica)
  • product_id — ID produktu w katalogu BL
  • variant_id — ID wariantu (0 = brak wariantów)
  • sku — kod SKU
  • ean — kod kreskowy EAN
  • quantity — ilość sztuk
  • price_brutto — cena brutto per sztuka

include_custom_extra_fields = 1

Bez tego parametru nie dostaniesz extra_field_* w response. Jeśli masz custom fieldy na zamówieniach (np. strefa magazynowa, gabaryt, uwagi wewnętrzne) — zawsze ustawiaj na 1. Nie zwiększa to czasu response.


Rate limiting — 30 zapytań na minutę

BaseLinker limituje do 30 zapytań per 60 sekund per konto (per token API). Przekroczenie daje HTTP 429 (“Too many requests”).

Problem

Przy paginacji 5 stron (500 zamówień) + enrichment produktów (5 stron po 100) masz 10 zapytań. Jeśli jednocześnie odpytujesz o stock, triggery, dokumenty — 30 req/min to za mało.

Rozwiązanie: sliding window rate limiter

import asyncio
import time
from collections import deque


class SlidingWindowRateLimiter:
    """Rate limiter: max N requests per window_seconds."""

    def __init__(self, max_requests: int = 30, window_seconds: float = 60.0):
        self._max = max_requests
        self._window = window_seconds
        self._timestamps: deque[float] = deque()

    async def wait_and_acquire(self) -> float:
        """Wait until a slot is available, then acquire it.
        Returns the time waited (0 if no wait was needed).
        """
        waited = 0.0
        while True:
            now = time.monotonic()
            # Remove expired timestamps
            while self._timestamps and self._timestamps[0] <= now - self._window:
                self._timestamps.popleft()

            if len(self._timestamps) < self._max:
                self._timestamps.append(now)
                return waited

            # Wait until the oldest request expires
            sleep_time = self._timestamps[0] + self._window - now + 0.05
            await asyncio.sleep(sleep_time)
            waited += sleep_time

Użycie: przed każdym requestem do BL API, wywołaj await rate_limiter.wait_and_acquire(). Jeśli slot jest wolny — od razu przechodzi. Jeśli nie — czeka aż najstarszy request “wypadnie” z okna.


Auto-retry z exponential backoff

Jakie błędy retryować

BłądRetry?Dlaczego
HTTP 429TAKRate limit — poczekaj i ponów. Backoff: 1s → 2s → 4s.
HTTP 5xxTAKServer error po stronie BL — przejściowy problem.
TimeoutZALEŻYRetry tylko dla operacji krytycznych (triggery, stock). Dla odczytów — lepiej użyć cache.
HTTP 400/404NIEBłąd logiczny — zły parametr, nieistniejący zasób. Retry nic nie zmieni.
status: “ERROR”NIEBłąd logiczny BL (np. zły token, brak uprawnień). Retryowanie maskuje bugi.

Implementacja retry

RETRY_BACKOFF = [1.0, 2.0, 4.0]  # sekundy między kolejnymi próbami
MAX_RETRIES = 3


async def bl_call_with_retry(
    client: httpx.AsyncClient,
    token: str,
    method: str,
    params: dict,
) -> dict:
    """BaseLinker API call with exponential backoff retry."""
    for attempt in range(MAX_RETRIES + 1):
        try:
            resp = await client.post(
                "https://api.baselinker.com/connector.php",
                headers={"X-BLToken": token},
                data={
                    "method": method,
                    "parameters": json.dumps(params),
                },
            )

            # Retry on rate limit
            if resp.status_code == 429:
                if attempt < MAX_RETRIES:
                    await asyncio.sleep(RETRY_BACKOFF[attempt])
                    continue
                resp.raise_for_status()

            # Retry on server error
            if resp.status_code >= 500:
                if attempt < MAX_RETRIES:
                    await asyncio.sleep(RETRY_BACKOFF[attempt])
                    continue
                resp.raise_for_status()

            resp.raise_for_status()
            data = resp.json()

            # BL logical error — nie retryuj
            if data.get("status") == "ERROR":
                raise RuntimeError(
                    f"{data.get('error_code')}: {data.get('error_message')}"
                )

            return data

        except httpx.TimeoutException:
            if attempt < MAX_RETRIES:
                await asyncio.sleep(RETRY_BACKOFF[attempt])
                continue
            raise

    raise RuntimeError(f"Failed after {MAX_RETRIES} retries")

Dwa konta BaseLinker — scalanie zamówień

Jeśli masz dwa (lub więcej) konta BaseLinker (np. osobne sklepy Allegro), możesz pobierać zamówienia równolegle i scalać je w jeden strumień.

Parallel fetch z asyncio.gather

async def fetch_merged_orders(
    token_a: str, status_a: int,
    token_b: str, status_b: int,
) -> list[dict]:
    """Pobierz zamówienia z dwóch kont BL równolegle i scal."""
    orders_a, orders_b = await asyncio.gather(
        fetch_all_orders(token_a, status_a),
        fetch_all_orders(token_b, status_b),
    )

    # Oznacz konto źródłowe
    for o in orders_a:
        o["_account"] = "A"
    for o in orders_b:
        o["_account"] = "B"

    # Scal i posortuj po dacie potwierdzenia
    merged = orders_a + orders_b
    merged.sort(key=lambda o: o.get("date_confirmed", 0))

    return merged

Kolumna _account jest kluczowa

Gdy odpalasz trigger na zamówieniu (runOrderMacroTrigger) lub zmieniasz status (setOrderStatus), musisz użyć tokena z właściwego konta. Pole _account mówi ci, którego klienta BL użyć. Bez tego — trigger pójdzie na złe konto i dostaniesz error “order not found”.

Każde konto BL ma osobny limit 30 req/min, więc parallel fetch nie zabiera slotów z jednego konta na drugie.


Enrichment — wzbogacanie zamówień o dane produktów

getOrders zwraca podstawowe dane produktów (nazwa, SKU, EAN, ilość, cena). Ale nie zwraca:

  • Custom fields produktu (np. strefa magazynowa, waga brutto, opakowanie zbiorcze)
  • Zdjęcia produktu (thumbnail)
  • Informacja o bundlach (zestawy produktów)

Do tego potrzebujesz getInventoryProductsData:

async def enrich_with_product_details(
    token: str,
    inventory_id: int,
    orders: list[dict],
    field_strefa: str = "extra_field_15043",
) -> list[dict]:
    """Dodaj strefę i thumbnail do produktów w zamówieniach."""
    # Zbierz unikalne product_id ze wszystkich zamówień
    product_ids = set()
    for order in orders:
        for item in order.get("products", []):
            pid = int(item.get("variant_id") or 0) or int(
                item.get("product_id") or 0
            )
            if pid:
                product_ids.add(pid)

    if not product_ids:
        return orders

    # Pobierz szczegóły w batchach po 100
    details = {}
    async with httpx.AsyncClient(timeout=30.0) as client:
        for i in range(0, len(list(product_ids)), 100):
            batch = list(product_ids)[i : i + 100]
            data = await bl_call_with_retry(
                client, token, "getInventoryProductsData",
                {"inventory_id": inventory_id, "products": batch},
            )
            for pid_str, pdata in data.get("products", {}).items():
                details[pid_str] = pdata

    # Wzbogać produkty w zamówieniach
    for order in orders:
        for item in order.get("products", []):
            pid = str(
                int(item.get("variant_id") or 0)
                or int(item.get("product_id") or 0)
            )
            detail = details.get(pid, {})
            text_fields = detail.get("text_fields", {})

            item["strefa"] = text_fields.get(field_strefa, "")

            images = detail.get("images", {})
            if images:
                first_key = sorted(images.keys())[0]
                item["thumbnail_url"] = images[first_key]

            item["is_bundle"] = bool(detail.get("is_bundle", 0))

    return orders

Caching produktów

Produkty zmieniają się rzadko (raz na godzinę max). Zamówienia — co 5 minut. Nie odpytuj getInventoryProductsData przy każdym sync — cachuj per product_id z TTL 5 minut:

# Redis caching (uproszczony)
product_cache = {}  # W produkcji: Redis z TTL

async def get_product_detail(token, inventory_id, pid):
    cache_key = f"product:{pid}:detail"
    if cache_key in product_cache:
        return product_cache[cache_key]

    # ... fetch from BL API ...
    product_cache[cache_key] = detail  # TTL 300s w Redis
    return detail

Kompletny async client — gotowy do produkcji

Poniżej zebrany client łączący wszystko: paginację, rate limiting, retry, enrichment.

"""BaseLinker getOrders client — production ready."""

import asyncio
import json
import logging
import time
from collections import deque
from dataclasses import dataclass

import httpx

logger = logging.getLogger(__name__)

BL_API = "https://api.baselinker.com/connector.php"
RATE_LIMIT = 30
RATE_WINDOW = 60.0
MAX_RETRIES = 3
BACKOFF = [1.0, 2.0, 4.0]


class BaseLinkerError(Exception):
    def __init__(self, message: str, code: str = ""):
        super().__init__(message)
        self.code = code


class RateLimiter:
    def __init__(self):
        self._ts: deque[float] = deque()

    async def acquire(self):
        while True:
            now = time.monotonic()
            while self._ts and self._ts[0] <= now - RATE_WINDOW:
                self._ts.popleft()
            if len(self._ts) < RATE_LIMIT:
                self._ts.append(now)
                return
            wait = self._ts[0] + RATE_WINDOW - now + 0.05
            await asyncio.sleep(wait)


@dataclass
class BLClient:
    token: str
    _rate: RateLimiter = None
    _http: httpx.AsyncClient = None

    async def start(self):
        self._rate = RateLimiter()
        self._http = httpx.AsyncClient(timeout=60.0)

    async def stop(self):
        if self._http:
            await self._http.aclose()

    async def call(self, method: str, params: dict) -> dict:
        for attempt in range(MAX_RETRIES + 1):
            await self._rate.acquire()
            try:
                resp = await self._http.post(
                    BL_API,
                    headers={"X-BLToken": self.token},
                    data={
                        "method": method,
                        "parameters": json.dumps(params),
                    },
                )
                if resp.status_code in (429,) or resp.status_code >= 500:
                    if attempt < MAX_RETRIES:
                        await asyncio.sleep(BACKOFF[attempt])
                        continue
                resp.raise_for_status()
                data = resp.json()
                if data.get("status") == "ERROR":
                    raise BaseLinkerError(
                        data.get("error_message", ""),
                        data.get("error_code", ""),
                    )
                return data
            except httpx.TimeoutException:
                if attempt < MAX_RETRIES:
                    await asyncio.sleep(BACKOFF[attempt])
                    continue
                raise
        raise RuntimeError("Max retries exceeded")

    async def get_orders(self, status_id: int) -> list[dict]:
        """Pobierz wszystkie zamówienia z paginacją."""
        all_orders, last_id = [], 0
        for _ in range(50):
            data = await self.call("getOrders", {
                "status_id": status_id,
                "id_from": last_id,
                "get_unconfirmed_orders": False,
                "include_custom_extra_fields": 1,
            })
            orders = data.get("orders", [])
            if not orders:
                break
            all_orders.extend(orders)
            new_last = max(o["order_id"] for o in orders)
            if new_last == last_id or len(orders) < 100:
                break
            last_id = new_last
        return all_orders


# Użycie:
async def main():
    client = BLClient(token="twoj-token")
    await client.start()

    orders = await client.get_orders(status_id=234562)
    print(f"Pobrano {len(orders)} zamówień")

    for o in orders[:3]:
        print(
            f"  #{o['order_id']}{o['delivery_fullname']} "
            f"— {len(o['products'])} produktów"
        )

    await client.stop()


if __name__ == "__main__":
    asyncio.run(main())

Pułapki i rzeczy, które dokumentacja pomija

Timeout 60s to minimum

BaseLinker bywa wolny przy dużych batchach (500+ zamówień). Timeout 30s powoduje fałszywe faile. Ustawiaj minimum 60 sekund na request. Dla operacji stockowych — nawet 90s.

variant_id vs product_id

Jeśli produkt ma warianty, variant_id to prawdziwy identyfikator. product_id to parent. Przy wzbogacaniu danych — bierz variant_id jeśli != 0, inaczej product_id.

date_confirmed = 0

Zamówienia za pobraniem nie mają date_confirmed (jest 0). Sortuj po date_add jako fallback albo ustaw get_unconfirmed_orders: true.

Rate limit jest per token, nie per IP

Dwa serwery z tym samym tokenem dzielą limit 30/min. Jeśli masz staging + produkcja na tym samym tokenie — staging potrafi “zjadać” limity produkcji. Używaj osobnych tokenów.

Persistent connection pooling na produkcji

Nie twórz nowego httpx.AsyncClient per request — każdy nowy client = nowy TCP handshake + TLS negotiation. Użyj jednego clienta dla całej aplikacji (singleton). W naszym systemie jeden httpx.AsyncClient żyje przez cały czas życia aplikacji i reużywa połączeń TCP. Różnica: ~200ms na request mniej.


Podsumowanie i co dalej

1 endpoint
connector.php dla wszystkiego
30 req/min
rate limit per token
100/page
max zamówień per request
60s timeout
minimum dla produkcji

Ten tutorial pokrywa getOrders — najczęściej używaną metodę. Ale BaseLinker API ma więcej metod, które współpracują z zamówieniami:

  • runOrderMacroTrigger — odpalanie makr (drukowanie etykiet, zmiana statusu)
  • setOrderStatus — programowa zmiana statusu zamówienia
  • getInventoryProductsData — szczegóły produktów (custom fields, zdjęcia)
  • getInventoryProductsStock — stany magazynowe
  • addInventoryDocument — dokumenty magazynowe (przyjęcia, wydania, korekty)

Jak te metody współpracują w prawdziwym systemie pakowania — opisaliśmy w case study systemu Pakowanie Kablowo, gdzie getOrders to punkt startowy całego flow: zamówienie → paginacja → enrichment → widok pakowania → trigger → etykieta → wysyłka.

Kod z tego tutoriala jest uproszczoną wersją klienta, którego używamy w produkcji — pełny client ma dodatkowo kolejkę priorytetową (CRITICAL > HIGH > NORMAL > LOW), connection pooling i WebSocket broadcasts.

Zainteresowany automatyzacją danych?

LiveSales pomoże Ci zaoszczędzić czas i podejmować lepsze decyzje biznesowe dzięki automatycznym raportom i dashboardom.

Skontaktuj się z nami

Podobał Ci się ten artykuł?

Subskrybuj, aby dostawać powiadomienia o nowych artykułach.

Bez spamu. Możesz się wypisać w każdej chwili.