This SDK provides the interface for writing UDFs and UDSinks in Python.
from pynumaflow.function import Messages, Message, Datum, UserDefinedFunctionServicer
def my_handler(key: str, datum: Datum) -> Messages:
val = datum.value
_ = datum.event_time
_ = datum.watermark
messages = Messages(Message.to_vtx(key, val))
return messages
if __name__ == "__main__":
grpc_server = UserDefinedFunctionServicer(map_handler=my_handler)
grpc_server.start()from typing import Iterator
from pynumaflow.function import Messages, Message, Datum, Metadata, UserDefinedFunctionServicer
def my_handler(key: str, datums: Iterator[Datum], md: Metadata) -> Messages:
interval_window = md.interval_window
counter = 0
for _ in datums:
counter += 1
msg = (
f"counter:{counter} interval_window_start:{interval_window.start} "
f"interval_window_end:{interval_window.end}"
)
return Messages(Message.to_vtx(key, str.encode(msg)))
if __name__ == "__main__":
grpc_server = UserDefinedFunctionServicer(reduce_handler=my_handler)
grpc_server.start()A sample UDF Dockerfile is provided under examples.
from typing import Iterator
from pynumaflow.sink import Datum, Responses, Response, UserDefinedSinkServicer
def my_handler(datums: Iterator[Datum]) -> Responses:
responses = Responses()
for msg in datums:
print("User Defined Sink", msg.value.decode("utf-8"))
responses.append(Response.as_success(msg.id))
return responses
if __name__ == "__main__":
grpc_server = UserDefinedSinkServicer(my_handler)
grpc_server.start()A sample UDSink Dockerfile is provided under examples.