import atexit
import json
import logging
import socket
import time
from dataclasses import fields
from typing import Any
from dc3client.models import (
ActualMove,
Concede,
Coordinate,
DCNotFoundError,
ExtraEndScore,
ExtraEndThinkingTime,
Finish,
Frame,
GameResult,
GameResultNotFoundError,
GameRule,
IsReady,
IsReadyNotFoundError,
LastMove,
MatchData,
NewGame,
PlayerInfo,
Players,
Position,
Scores,
ServerDC,
Setting,
ShotInfo,
Simulator,
Start,
State,
StoneRotation,
Stones,
ThinkingTime,
ThinkingTimeRemaining,
Trajectory,
Update,
Velocity,
Version,
)
[docs]
class BaseClient:
def __init__(
self, timeout: int = 10, buffer: int = 1024, log_level: int = logging.INFO
):
"""base client initialize
Args:
timeout (int, optional): Time to client timeout. Defaults to 10.
buffer (int, optional): Buffer size. Defaults to 1024.
log_level (int, optional): Minimum level of logging. Defaults to logging.INFO.
"""
self.socket = None # socket object
self.address = None # address of server
self.timeout = timeout # timeout in seconds
self.buffer = buffer # buffer size in bytes
self.message_buffer = ""
self.logger = logging.getLogger("socket_client")
self.logger.setLevel(log_level)
formatter = logging.Formatter(
"%(asctime)s - %(levelname)s:%(name)s - %(message)s"
)
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
self.logger.addHandler(stream_handler)
# Get current time for rate limit
self.last_send = time.time()
# Rate limit time interval
self.rate_limit = 3.0
def _connect(self, address: tuple):
self.logger.info(f"Connect to {address}")
self.address = address # address of server
self.socket = socket.socket(
socket.AF_INET, socket.SOCK_STREAM, 0
) # create socket object
self.socket.settimeout(self.timeout) # set timeout
self.socket.connect(self.address) # connect to server
self.logger.info(f"Connect to {address} success")
if self.socket is None:
self.logger.error("Socket is None")
raise Exception("Socket is None")
# Close socket on exit
atexit.register(self.__shutdown)
[docs]
def send(self, message: str = ""):
"""send message to server
Args:
message (str, optional): massage content. Defaults to "".
"""
if message == "":
self.logger.info("In Manual Mode")
while True:
message = input("> ")
self.socket.send(message.encode("utf-8")) # type: ignore
self.logger.info(f"Send message : {message}")
else:
while True:
if (
wait_time := ((now := time.time()) - self.last_send)
) > self.rate_limit:
self.socket.send(message.encode("utf-8")) # type: ignore
self.logger.info(f"Send message : {message}")
self.last_send = now
break
else:
self.logger.debug(f"Rate limit {self.rate_limit} seconds")
self.logger.debug(f"Please wait {wait_time + .5} seconds")
time.sleep(wait_time + 0.5)
[docs]
def receive(self) -> dict[str, Any]:
"""receive message from server until "\n"
Returns:
dict[str, Any]: received message
"""
message = ""
while True:
if self.message_buffer != "":
message_recv = self.message_buffer
self.message_buffer = ""
else:
message_recv: str = self.socket.recv(self.buffer).decode("utf-8") # type: ignore
message += message_recv
if "\n" in message_recv:
# split message by "\n"
message_split = message.split("\n")
# set first message to message
message = message_split[0]
# set second and later message to message_buffer
self.message_buffer = "\n".join(message_split[1:])
break
s = json.dumps(message)
json_data = json.loads(s)
dict_data = json.loads(json_data)
return dict_data
def __shutdown(self):
"""shutdown socket"""
self.logger.info("Shutdown socket")
try:
self.socket.shutdown(socket.SHUT_RDWR) # type: ignore
self.socket.close() # type: ignore
self.logger.info("Shutdown socket success")
except BaseException as e:
self.logger.error(f"Shutdown socket failed {e}")
pass
[docs]
class SocketClient(BaseClient):
def __init__(
self,
host: str = "dc3-server",
port: int = 10000,
client_name: str = "AI0",
auto_start: bool = True,
rate_limit: float = 0.2,
) -> None:
"""initialize socket client
Args:
host (str, optional): URL of the server of Digital Curling 3. Defaults to "dc3-server".
port (int, optional): Connection port to Digital Curling Server. Defaults to 10000.
client_name (str, optional): Identification name of the client. Defaults to "AI0".
auto_start (bool, optional): Whether to start the game automatically. Defaults to True.
rate_limit (int, optional): Minimum time interval to send data to the server. Defaults to 3.
"""
self.is_connected = False
self.server = (host, port)
super().__init__(timeout=60, buffer=1024)
self.obj_dict = {}
self.match_data = MatchData()
self.move_info: list[ShotInfo] = []
# Name of the client
self.client_name = client_name
self.rate_limit = rate_limit
if auto_start:
self.start_game()
[docs]
def start_game(self):
"""start game"""
super()._connect(self.server)
self.dc_recv()
self.dc_ok()
self.is_ready_recv()
self.ready_ok()
self.get_new_game()
def __stone_trajectory_parser(self, message_recv: list) -> list:
"""Convert stone trajectory to data class
Args:
message_recv (list): List of stone trajectory
Returns:
list: List of data classes for stone trajectory
"""
team_stone: list = []
for data in message_recv:
if data is None:
team_stone.append(
Coordinate(angle=None, position=[Position(x=None, y=None)])
)
else:
team_stone.append(
Coordinate(
angle=data["angle"],
position=[
Position(x=data["position"]["x"], y=data["position"]["y"])
],
)
)
return team_stone
[docs]
def dc_recv(self):
"""receive dc"""
message_recv = self.receive()
version = Version(
major=message_recv["version"]["major"],
minor=message_recv["version"]["minor"],
)
dc = ServerDC(
game_id=message_recv["game_id"],
cmd=message_recv["cmd"],
version=version,
date_time=message_recv["date_time"],
)
self.match_data.server_dc = dc
[docs]
def dc_ok(self) -> None:
"""send dc_ok"""
message: dict = {"cmd": "dc_ok", "name": self.client_name}
message_str: str = json.dumps(message)
message_str: str = message_str + "\n"
self.send(message_str)
[docs]
def is_ready_recv(self):
"""receive is_ready"""
message_recv = self.receive()
thinking_time = ThinkingTime(
team0=message_recv["game"]["setting"]["thinking_time"]["team0"],
team1=message_recv["game"]["setting"]["thinking_time"]["team1"],
)
extra_end_thinking_time = ExtraEndThinkingTime(
team0=message_recv["game"]["setting"]["extra_end_thinking_time"]["team0"],
team1=message_recv["game"]["setting"]["extra_end_thinking_time"]["team1"],
)
simulator = Simulator(
simulator_type=message_recv["game"]["simulator"]["type"],
seconds_per_frame=message_recv["game"]["simulator"]["seconds_per_frame"],
)
team0_players = [
PlayerInfo(
max_speed=data["max_speed"],
seed=None,
stddev_angle=data["stddev_angle"],
stddev_speed=data["stddev_speed"],
randomness=data["type"],
)
for data in message_recv["game"]["players"]["team0"]
]
team1_players = [
PlayerInfo(
randomness=data["type"],
stddev_speed=data["stddev_speed"],
stddev_angle=data["stddev_angle"],
max_speed=data["max_speed"],
seed=None,
)
for data in message_recv["game"]["players"]["team1"]
]
players = Players(team0=team0_players, team1=team1_players)
game_setting = Setting(
max_end=message_recv["game"]["setting"]["max_end"],
sheet_width=message_recv["game"]["setting"]["sheet_width"],
five_rock_rule=message_recv["game"]["setting"]["five_rock_rule"],
thinking_time=thinking_time,
extra_end_thinking_time=extra_end_thinking_time,
)
game = GameRule(
rule=message_recv["game"]["rule"],
setting=game_setting,
players=players,
simulator=simulator,
)
is_ready = IsReady(
cmd=message_recv["cmd"], team=message_recv["team"], game=game
)
self.match_data.is_ready = is_ready
[docs]
def ready_ok(self, player_order: list = [0, 1, 2, 3]) -> None:
"""send ready_ok"""
ready = {"cmd": "ready_ok", "player_order": player_order}
ready_str = json.dumps(ready)
ready_str = ready_str + "\n"
self.send(ready_str)
[docs]
def get_new_game(self):
"""receive new_game"""
message_recv = self.receive()
self.match_data.new_game = NewGame(
cmd=message_recv["cmd"], name=message_recv["name"]
)
if self.match_data.new_game is not None:
self.is_connected = True
[docs]
def update(self):
"""receive update"""
if self.is_connected is False:
raise Exception("Not connected to server")
message_recv = self.receive()
start_team0_position = []
start_team1_position = []
finish_team0_position = []
finish_team1_position = []
frame_data = []
last_move = None
trajectory = None
if message_recv["cmd"] != "update":
return
if message_recv["state"]["game_result"] is None:
winner = None
reason = None
else:
winner = message_recv["state"]["game_result"]["winner"]
reason = message_recv["state"]["game_result"]["reason"]
game_result = GameResult(
winner=winner,
reason=reason,
)
extra_end_score = ExtraEndScore(
team0=message_recv["state"]["extra_end_score"]["team0"],
team1=message_recv["state"]["extra_end_score"]["team1"],
)
scores = Scores(
team0=message_recv["state"]["scores"]["team0"],
team1=message_recv["state"]["scores"]["team1"],
)
stones = Stones(
team0=self.__stone_trajectory_parser(
message_recv["state"]["stones"]["team0"]
),
team1=self.__stone_trajectory_parser(
message_recv["state"]["stones"]["team1"]
),
)
thinking_time_remaining = ThinkingTimeRemaining(
team0=message_recv["state"]["thinking_time_remaining"]["team0"],
team1=message_recv["state"]["thinking_time_remaining"]["team1"],
)
if message_recv["last_move"] is None:
free_guard_zone_foul = False
cmd_type = None
rotation = None
x = None
y = None
seconds_per_frame = None
else:
last_move = message_recv["last_move"]
actual_move = message_recv["last_move"]["actual_move"]
free_guard_zone_foul = message_recv["last_move"]["free_guard_zone_foul"]
cmd_type = message_recv["last_move"]["actual_move"]["type"]
if cmd_type == "shot":
velocity = message_recv["last_move"]["actual_move"]["velocity"]
rotation = message_recv["last_move"]["actual_move"]["rotation"]
x = message_recv["last_move"]["actual_move"]["velocity"]["x"]
y = message_recv["last_move"]["actual_move"]["velocity"]["y"]
if message_recv["last_move"]["trajectory"] is None:
seconds_per_frame = None
else:
seconds_per_frame = message_recv["last_move"]["trajectory"][
"seconds_per_frame"
]
start_team0_position = self.__stone_trajectory_parser(
message_recv["last_move"]["trajectory"]["start"]["team0"]
)
start_team1_position = self.__stone_trajectory_parser(
message_recv["last_move"]["trajectory"]["start"]["team1"]
)
finish_team0_position = self.__stone_trajectory_parser(
message_recv["last_move"]["trajectory"]["finish"]["team0"]
)
finish_team1_position = self.__stone_trajectory_parser(
message_recv["last_move"]["trajectory"]["finish"]["team1"]
)
for frames in message_recv["last_move"]["trajectory"]["frames"]:
for frame in frames:
if frame is None or frame["value"] is None:
frame_data.append(
Frame(
team=None,
index=None,
value=Coordinate(
angle=None,
position=[Position(x=None, y=None)],
),
)
)
else:
frame_data.append(
Frame(
team=frame["team"],
index=frame["index"],
value=Coordinate(
angle=frame["value"]["angle"],
position=[
Position(
x=frame["value"]["position"]["x"],
y=frame["value"]["position"]["y"],
)
],
),
)
)
velocity = Velocity(x=x, y=y)
actual_move = ActualMove(
rotation=rotation,
type=cmd_type,
velocity=velocity,
)
else: # cmd_type == "concede"
if cmd_type != "concede":
raise Exception(f"cmd_type is not concede. cmd_type : {cmd_type}")
seconds_per_frame = message_recv["last_move"]["trajectory"][
"seconds_per_frame"
]
actual_move = Concede()
start = Start(
team0=start_team0_position,
team1=start_team1_position,
)
finish = Finish(
team0=finish_team0_position,
team1=finish_team1_position,
)
trajectory = Trajectory(
seconds_per_frame=seconds_per_frame,
start=start,
finish=finish,
frames=frame_data,
)
last_move = LastMove(
actual_move=actual_move,
free_guard_zone_foul=free_guard_zone_foul,
trajectory=trajectory,
)
state = State(
end=message_recv["state"]["end"],
extra_end_score=extra_end_score,
game_result=game_result,
hammer=message_recv["state"]["hammer"],
scores=scores,
shot=message_recv["state"]["shot"],
stones=stones,
thinking_time_remaining=thinking_time_remaining,
)
update_info = Update(
cmd=message_recv["cmd"],
last_move=last_move,
next_team=message_recv["next_team"],
state=state,
)
self.logger.info(f"next_team : {update_info.next_team}")
self.match_data.update_list.append(update_info)
[docs]
def move(
self,
x: float = 0.0,
y: float = 2.4,
rotation: StoneRotation = StoneRotation.outturn,
) -> None:
"""Shot Stone
Args:
x (float, optional): Velocity in x-axis direction. Defaults to 0.0.
y (float, optional): Velocity in y-axis direction. Defaults to 2.4.
rotation (StoneRotation, optional): Rotation of stone. Defaults to StoneRotation.outturn.
"""
if self.is_connected is False:
raise Exception("Not connected to server")
self.move_info.append(
ShotInfo(
velocity_x=x,
velocity_y=y,
rotation=rotation,
)
)
shot = {
"cmd": "move",
"move": {
"type": "shot",
"velocity": {"x": x, "y": y},
"rotation": rotation,
},
}
s = json.dumps(shot)
s = s + "\n"
self.send(s)
[docs]
def concede(self) -> None:
"""Concede"""
concede = {"cmd": "move", "move": {"type": "concede"}}
s = json.dumps(concede)
s = s + "\n"
self.send(s)
[docs]
def get_my_team(self) -> str:
"""get my team name
Returns:
str: my team name
"""
if self.match_data.is_ready is None:
raise IsReadyNotFoundError("is_ready is None")
return self.match_data.is_ready.team
[docs]
def get_next_team(self) -> str:
"""get next team name"""
return self.match_data.update_list[-1].next_team
[docs]
def get_match_data(self) -> MatchData:
"""get match data"""
return self.match_data
[docs]
def get_winner(self) -> str | None:
"""get winner"""
if self.match_data.update_list[-1].state.game_result is None:
raise GameResultNotFoundError("game_result is None")
return self.match_data.update_list[-1].state.game_result.winner
[docs]
def get_move_info(self) -> list[ShotInfo]:
"""get move info"""
return self.move_info
[docs]
def get_update_and_trajectory(
self, remove_trajectory: bool = True
) -> tuple[list[Update], list[Trajectory]]:
"""get update and trajectory
Args:
remove_trajectory (bool, optional): Delete trajectory data from Update?. Defaults to True.
Returns:
tuple[list[Update], list[Trajectory]]: update list and trajectory list
"""
update_list: list[Update] = []
trajectory_list: list[Trajectory] = []
for update_data in self.match_data.update_list:
if (
update_data.last_move is not None
and update_data.last_move.trajectory is not None
):
trajectory_list.append(update_data.last_move.trajectory)
if remove_trajectory is True:
data = update_data # not to change original data
if data.last_move is not None:
data.last_move.trajectory = None
update_list.append(data)
else:
update_list.append(update_data)
return update_list, trajectory_list
[docs]
def get_dc(self) -> ServerDC:
"""get dc data"""
if self.match_data.server_dc is None:
raise DCNotFoundError("dc_data is None")
return self.match_data.server_dc
[docs]
def get_is_ready(self) -> IsReady:
"""get is_ready data"""
if self.match_data.is_ready is None:
raise IsReadyNotFoundError("is_ready_data is None")
return self.match_data.is_ready
[docs]
def convert_dc(self, dc_data: ServerDC) -> dict[str, Any]:
"""convert ServerDC to dict"""
dc_dict = {}
for field in fields(dc_data):
value = getattr(dc_data, field.name)
if field.name == "version":
dc_dict["version"] = {}
for field in fields(value):
dc_version = getattr(value, field.name)
dc_dict["version"][field.name] = dc_version
else:
dc_dict[field.name] = value
return dc_dict
[docs]
def convert_is_ready(self, is_ready: IsReady) -> dict[str, Any]:
"""convert IsReady to dict
Args:
is_ready (IsReady): is_ready
Returns:
dict: converted is_ready
"""
is_ready_dict = {}
for field in fields(is_ready):
value = getattr(is_ready, field.name)
if field.name == "game":
is_ready_dict[field.name] = {}
for field in fields(value):
game_value = getattr(value, field.name)
if field.name == "setting":
is_ready_dict["game"][field.name] = {}
for field in fields(game_value):
config_value = getattr(game_value, field.name)
if field.name == "thinking_time":
is_ready_dict["game"]["setting"][field.name] = {}
for field in fields(config_value):
thinking_time_value = getattr(
config_value, field.name
)
is_ready_dict["game"]["setting"]["thinking_time"][
field.name
] = thinking_time_value
elif field.name == "extra_end_thinking_time":
is_ready_dict["game"]["setting"][field.name] = {}
for field in fields(config_value):
extra_end_thinking_time_value = getattr(
config_value, field.name
)
is_ready_dict["game"]["setting"][
"extra_end_thinking_time"
][field.name] = extra_end_thinking_time_value
else:
is_ready_dict["game"]["setting"][
field.name
] = config_value
if field.name == "simulator":
is_ready_dict["game"][field.name] = {}
for field in fields(game_value):
simulator_value = getattr(game_value, field.name)
is_ready_dict["game"]["simulator"][
field.name
] = simulator_value
if field.name == "players":
is_ready_dict["game"][field.name] = {}
for field in fields(game_value):
team_data = getattr(game_value, field.name)
is_ready_dict["game"]["players"][field.name] = []
players_list = []
for i in team_data:
team_dict = {}
for field in fields(i):
normal0 = getattr(i, field.name)
team_dict[field.name] = normal0
players_list.append(team_dict)
is_ready_dict["game"]["players"][field.name] = players_list
else:
is_ready_dict["game"][field.name] = game_value
else:
is_ready_dict[field.name] = value
return is_ready_dict
[docs]
def convert_update(
self, update_data: Update, remove_trajectory: bool = True
) -> dict[str, Any]:
"""convert Update to dict
Args:
update_data (Update): Update
remove_trajectory (bool): Delete trajectory data from Update? Defaults to True.
Returns:
dict: converted Update
"""
update_dict: dict = {}
for field in fields(update_data):
update_value = getattr(update_data, field.name)
if field.name == "state":
update_dict[field.name] = {}
for field in fields(update_value):
state_value = getattr(update_value, field.name)
update_dict["state"][field.name] = {}
if field.name == "extra_end_score":
update_dict["state"][field.name] = {}
for field in fields(state_value):
extra_end_score_value = getattr(state_value, field.name)
update_dict["state"]["extra_end_score"][
field.name
] = extra_end_score_value
elif field.name == "game_result":
update_dict["state"][field.name] = {}
for field in fields(state_value):
game_result_value = getattr(state_value, field.name)
update_dict["state"]["game_result"][
field.name
] = game_result_value
elif field.name == "scores":
update_dict["state"][field.name] = {}
for field in fields(state_value):
scores_value = getattr(state_value, field.name)
update_dict["state"]["scores"][field.name] = scores_value
elif field.name == "stones":
for field in fields(state_value):
stones_value = getattr(state_value, field.name)
update_dict["state"]["stones"][field.name] = []
state_stones_team_list = []
for i in stones_value:
state_stone_team_dict: dict = {}
for team in fields(i):
team_value = getattr(i, team.name)
if team.name == "position":
for pos in team_value:
state_stone_team_dict["position"] = {}
for position in fields(pos):
team_position_value = getattr(
pos, position.name
)
state_stone_team_dict["position"][
position.name
] = team_position_value
else:
state_stone_team_dict[team.name] = team_value
state_stones_team_list.append(state_stone_team_dict)
update_dict["state"]["stones"][
field.name
] = state_stones_team_list
elif field.name == "thinking_time_remaining":
update_dict["state"][field.name] = {}
for field in fields(state_value):
thinking_time_remaining_value = getattr(
state_value, field.name
)
update_dict["state"]["thinking_time_remaining"][
field.name
] = thinking_time_remaining_value
else:
update_dict["state"][field.name] = state_value
elif field.name == "last_move":
if update_value is None:
update_dict[field.name] = None
continue
update_dict[field.name] = {}
for field in fields(update_value):
last_move_value = getattr(update_value, field.name)
if field.name == "actual_move":
update_dict["last_move"][field.name] = {}
for field in fields(last_move_value):
actual_move_value = getattr(last_move_value, field.name)
if field.name == "velocity":
update_dict["last_move"]["actual_move"][field.name] = {}
for field in fields(actual_move_value):
velocity_value = getattr(
actual_move_value, field.name
)
update_dict["last_move"]["actual_move"]["velocity"][
field.name
] = velocity_value
else:
update_dict["last_move"]["actual_move"][
field.name
] = actual_move_value
elif field.name == "trajectory":
if remove_trajectory is True:
update_dict["last_move"][field.name] = None
else:
update_dict["last_move"][field.name] = {}
for field in fields(last_move_value):
trajectory_value = getattr(last_move_value, field.name)
if field.name == "start":
update_dict["last_move"]["trajectory"][
field.name
] = {}
for field in fields(trajectory_value):
start_value = getattr(
trajectory_value, field.name
)
update_dict["last_move"]["trajectory"]["start"][
field.name
] = []
start_team_list = []
for field in fields(start_value):
start_team_dict = {}
start_team_value = getattr(
start_value, field.name
)
if field.name == "position":
start_team_dict["position"] = {}
for field in fields(start_team_value):
start_team0_position_value = (
getattr(
start_team_value, field.name
)
)
start_team_dict["position"][
field.name
] = start_team0_position_value
else:
start_team_dict[
field.name
] = start_team_value
start_team_list.append(start_team_dict)
update_dict["last_move"]["trajectory"]["start"][
field.name
] = start_team_list
elif field.name == "finish":
update_dict["last_move"]["trajectory"][
field.name
] = {}
for field in fields(trajectory_value):
finish_value = getattr(
trajectory_value, field.name
)
update_dict["last_move"]["trajectory"][
"finish"
][field.name] = []
finish_team0_list = []
for field in fields(finish_value):
finish_team_dict = {}
finish_team_value = getattr(
finish_value, field.name
)
if field.name == "position":
finish_team_dict["position"] = {}
for field in fields(finish_team_value):
finish_team_position_value = (
getattr(
finish_team_value,
field.name,
)
)
finish_team_dict["position"][
field.name
] = finish_team_position_value
else:
finish_team_dict[
field.name
] = finish_team_value
finish_team0_list.append(finish_team_dict)
update_dict["last_move"]["trajectory"][
"finish"
][field.name] = finish_team0_list
elif field.name == "frames":
update_dict["last_move"]["trajectory"][
field.name
] = []
for field in fields(trajectory_value):
frames_list = []
frames_dict = {}
frames_value = getattr(
trajectory_value, field.name
)
if field.name == "value":
frames_dict[field.name] = {}
for field in fields(frames_value):
frames_value_value = getattr(
frames_value, field.name
)
if field.name == "position":
frames_dict["value"][
field.name
] = {}
for field in fields(
frames_value_value
):
frames_value_position_value = (
getattr(
frames_value_value,
field.name,
)
)
frames_dict["value"][
"position"
][
field.name
] = frames_value_position_value
else:
frames_dict["value"][
field.name
] = frames_value_value
else:
frames_dict[field.name] = frames_value
frames_list.append(frames_dict)
update_dict["last_move"]["trajectory"][
"frames"
].append(frames_list)
else:
update_dict["last_move"]["trajectory"][
field.name
] = trajectory_value
else:
update_dict["last_move"][field.name] = last_move_value
else:
update_dict[field.name] = update_value
return update_dict
[docs]
def convert_trajectory(self, trajectory_data: Trajectory) -> dict[str, Any]:
"""convert trajectory to dict
Args:
trajectory_data (Trajectory): trajectory
Returns:
dict[str, Any]: dict of trajectory
"""
trajectory_dict = {}
for field in fields(trajectory_data):
value = getattr(trajectory_data, field.name)
if field.name == "start":
trajectory_dict[field.name] = {}
for field in fields(value):
start_value = getattr(value, field.name)
# if field.name == "team0":
trajectory_dict["start"][field.name] = []
start_team_list = []
for i in start_value:
start_team_dict = {}
for start_field in fields(i):
start_team_value = getattr(i, start_field.name)
if start_field.name == "position":
for j in start_team_value:
start_team_dict["position"] = {}
for pos_field in fields(j):
start_team_position_value = getattr(
j, pos_field.name
)
start_team_dict["position"][
pos_field.name
] = start_team_position_value
else:
start_team_dict[start_field.name] = start_team_value
start_team_list.append(start_team_dict)
trajectory_dict["start"][field.name] = start_team_list
elif field.name == "finish":
trajectory_dict[field.name] = {}
for field in fields(value):
finish_value = getattr(value, field.name)
trajectory_dict["finish"][field.name] = []
finish_team_list = []
for i in finish_value:
finish_team_dict = {}
for field in fields(i):
finish_team_value = getattr(i, field.name)
if field.name == "position":
for j in finish_team_value:
finish_team_dict["position"] = {}
for finish_field in fields(j):
finish_team_position_value = getattr(
j, finish_field.name
)
finish_team_dict["position"][
finish_field.name
] = finish_team_position_value
else:
finish_team_dict[field.name] = finish_team_value
finish_team_list.append(finish_team_dict)
trajectory_dict["finish"][field.name] = finish_team_list
elif field.name == "frames":
trajectory_dict[field.name] = []
for i in value:
frames_dict = {}
frames_list = []
for field in fields(i):
frame_value = getattr(i, field.name)
if field.name == "value":
frames_dict[field.name] = {}
for frame_field in fields(frame_value):
frame_data_value = getattr(
frame_value, frame_field.name
)
if frame_field.name == "position":
frames_dict["value"][frame_field.name] = {}
for j in frame_data_value:
for frame_field in fields(j):
frame_data_position_value = getattr(
j, frame_field.name
)
frames_dict["value"]["position"][
frame_field.name
] = frame_data_position_value
else:
frames_dict["value"][
frame_field.name
] = frame_data_value
else:
frames_dict[field.name] = frame_value
frames_list.append(frames_dict)
trajectory_dict["frames"] += frames_list
else:
trajectory_dict[field.name] = value
return trajectory_dict