66 lines
1.6 KiB
Python
66 lines
1.6 KiB
Python
"""Personio tap class."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from singer_sdk import Tap
|
|
from singer_sdk import typing as th # JSON schema typing helpers
|
|
|
|
# TODO: Import your custom stream types here:
|
|
from tap_personio import streams
|
|
|
|
|
|
class TapPersonio(Tap):
|
|
"""Personio tap class."""
|
|
|
|
name = "tap-personio"
|
|
|
|
config_jsonschema = th.PropertiesList(
|
|
th.Property(
|
|
"client_id",
|
|
th.StringType,
|
|
required=True,
|
|
secret=True,
|
|
description="The client id to authenticate against the Personio API",
|
|
),
|
|
th.Property(
|
|
"client_secret",
|
|
th.StringType,
|
|
required=True,
|
|
secret=True,
|
|
description="The client secret to authenticate against the Personio API",
|
|
),
|
|
th.Property(
|
|
"start_date",
|
|
th.DateTimeType,
|
|
description="The earliest record date to sync",
|
|
),
|
|
th.Property(
|
|
"partner_id",
|
|
th.StringType,
|
|
required=False,
|
|
default="",
|
|
description="The partner ID for the Personio API, if applicable",
|
|
),
|
|
th.Property(
|
|
"app_id",
|
|
th.StringType,
|
|
required=True,
|
|
default="",
|
|
description="The app ID for the Personio API",
|
|
),
|
|
).to_dict()
|
|
|
|
def discover_streams(self) -> list[streams.PersonioStream]:
|
|
"""Return a list of discovered streams.
|
|
|
|
Returns:
|
|
A list of discovered streams.
|
|
"""
|
|
return [
|
|
streams.EmployeesStream(self),
|
|
]
|
|
|
|
|
|
if __name__ == "__main__":
|
|
TapPersonio.cli()
|