Source code for apps.api.views.device_view
from rest_framework import status
from rest_framework.response import Response
from apps.api.serializers.serializers import DeviceSerializer, DeviceTypeSerializer, SensorSerializer
from apps.core.models.device import Device
from apps.core.models.sensor import Sensor
from apps.core.models.device_type import DeviceType
from apps.api.exceptions import ApiException, ValidationException
from django.utils import timezone
from rest_framework.views import APIView
from apps.api.model_helper import *
from rest_framework.permissions import IsAuthenticated
[docs]class BulkDeviceManagement(APIView):
# permission_classes = (IsAuthenticated,)
[docs] def post(self, request, device_type_id):
data = request.data
device_type = get_device_type(device_type_id)
for device in data:
device["type"] = device_type.id
serializer = DeviceSerializer(data=data, many=True)
if not serializer.is_valid():
raise ValidationException(
f"Validation error: {serializer.errors}", status.HTTP_400_BAD_REQUEST
)
serializer.save()
device_type_as_json = DeviceTypeSerializer(device_type).data
for device in serializer.data:
device["type"] = device_type_as_json
return Response(status=status.HTTP_201_CREATED, data=serializer.data)
[docs] def delete(self, request, device_type_id):
devices = Device.objects.filter(type=device_type_id)
# TODO: refactor
# temporarily until the queryset manager is implemented
for dev in devices:
dev.deleted_at = timezone.now()
dev.save()
return Response(status=status.HTTP_204_NO_CONTENT)
[docs]class DeviceViewManagement(APIView):
# permission_classes = (IsAuthenticated,)
[docs] def get(self, request):
all_device_types = get_all_device_types()
device_types_serializer = DeviceTypeSerializer(all_device_types, many=True)
list_device_types = list(device_types_serializer.data)
all_devices = get_all_devices()
devices_serializer = DeviceSerializer(all_devices, many=True)
list_devices = list(devices_serializer.data)
for dev in list_devices:
for dev_type in list_device_types:
if str(dev["type"]) == str(dev_type["id"]):
dev["type"] = dev_type
break
return Response(status=status.HTTP_200_OK, data=list_devices)
[docs] def post(self, request):
data = request.data
serializer = DeviceSerializer(data=data)
if not serializer.is_valid():
raise ValidationException(
f"Validation error: {serializer.errors}", status.HTTP_400_BAD_REQUEST
)
serializer.save()
device_type = get_device_type(data["type"])
device_json = serializer.data
device_json["type"] = DeviceTypeSerializer(device_type).data
return Response(status=status.HTTP_201_CREATED, data=device_json)
[docs]class DeviceViewDetail(APIView):
# permission_classes = (IsAuthenticated,)
[docs] def get(self, request, device_id):
device = get_device(device_id)
device_type = get_device_type(device.type_id)
device_as_json = DeviceSerializer(device).data
device_type_as_json = DeviceTypeSerializer(device_type).data
# returning full device_type object
device_as_json["type"] = device_type_as_json
return Response(status=status.HTTP_200_OK, data=device_as_json)
[docs] def delete(self, request, device_id):
device = get_device(device_id)
device.soft_delete()
return Response(status=status.HTTP_204_NO_CONTENT)
[docs] def put(self, request, device_id):
data = request.data
device = get_device(device_id)
serializer = DeviceSerializer(device, data=data)
if not serializer.is_valid():
raise ValidationException(f"Validation error: {serializer.errors}", status.HTTP_400_BAD_REQUEST)
serializer.save()
device_as_json = serializer.data
device_type = get_device_type(data.get("type"))
device_type_as_json = DeviceTypeSerializer(device_type).data
# returning full device_type object
device_as_json["type"] = device_type_as_json
return Response(status=status.HTTP_200_OK, data=device_as_json)
[docs]class DeviceViewAllSensors(APIView):
[docs] def get(self, request, device_id):
all_sensors = get_all_sensors()
all_sensors_serializer = SensorSerializer(all_sensors, many=True)
list_sensors = list(all_sensors_serializer.data)
send_sensor = [i for i in list_sensors if str(i["device"]) == str(device_id)]
return Response(status=status.HTTP_200_OK, data=send_sensor)