Source code for apps.api.views.device_view

from rest_framework import status
from rest_framework.response import Response
from apps.api.serializers.serializers import DeviceSerializer, DeviceTypeSerializer, SensorSerializer
from apps.core.models.device import Device
from apps.core.models.sensor import Sensor
from apps.core.models.device_type import DeviceType
from apps.api.exceptions import ApiException, ValidationException
from django.utils import timezone
from rest_framework.views import APIView

from apps.api.model_helper import *
from rest_framework.permissions import IsAuthenticated


[docs]class BulkDeviceManagement(APIView): # permission_classes = (IsAuthenticated,)
[docs] def post(self, request, device_type_id): data = request.data device_type = get_device_type(device_type_id) for device in data: device["type"] = device_type.id serializer = DeviceSerializer(data=data, many=True) if not serializer.is_valid(): raise ValidationException( f"Validation error: {serializer.errors}", status.HTTP_400_BAD_REQUEST ) serializer.save() device_type_as_json = DeviceTypeSerializer(device_type).data for device in serializer.data: device["type"] = device_type_as_json return Response(status=status.HTTP_201_CREATED, data=serializer.data)
[docs] def delete(self, request, device_type_id): devices = Device.objects.filter(type=device_type_id) # TODO: refactor # temporarily until the queryset manager is implemented for dev in devices: dev.deleted_at = timezone.now() dev.save() return Response(status=status.HTTP_204_NO_CONTENT)
[docs]class DeviceViewManagement(APIView): # permission_classes = (IsAuthenticated,)
[docs] def get(self, request): all_device_types = get_all_device_types() device_types_serializer = DeviceTypeSerializer(all_device_types, many=True) list_device_types = list(device_types_serializer.data) all_devices = get_all_devices() devices_serializer = DeviceSerializer(all_devices, many=True) list_devices = list(devices_serializer.data) for dev in list_devices: for dev_type in list_device_types: if str(dev["type"]) == str(dev_type["id"]): dev["type"] = dev_type break return Response(status=status.HTTP_200_OK, data=list_devices)
[docs] def post(self, request): data = request.data serializer = DeviceSerializer(data=data) if not serializer.is_valid(): raise ValidationException( f"Validation error: {serializer.errors}", status.HTTP_400_BAD_REQUEST ) serializer.save() device_type = get_device_type(data["type"]) device_json = serializer.data device_json["type"] = DeviceTypeSerializer(device_type).data return Response(status=status.HTTP_201_CREATED, data=device_json)
[docs]class DeviceViewDetail(APIView): # permission_classes = (IsAuthenticated,)
[docs] def get(self, request, device_id): device = get_device(device_id) device_type = get_device_type(device.type_id) device_as_json = DeviceSerializer(device).data device_type_as_json = DeviceTypeSerializer(device_type).data # returning full device_type object device_as_json["type"] = device_type_as_json return Response(status=status.HTTP_200_OK, data=device_as_json)
[docs] def delete(self, request, device_id): device = get_device(device_id) device.soft_delete() return Response(status=status.HTTP_204_NO_CONTENT)
[docs] def put(self, request, device_id): data = request.data device = get_device(device_id) serializer = DeviceSerializer(device, data=data) if not serializer.is_valid(): raise ValidationException(f"Validation error: {serializer.errors}", status.HTTP_400_BAD_REQUEST) serializer.save() device_as_json = serializer.data device_type = get_device_type(data.get("type")) device_type_as_json = DeviceTypeSerializer(device_type).data # returning full device_type object device_as_json["type"] = device_type_as_json return Response(status=status.HTTP_200_OK, data=device_as_json)
[docs]class DeviceViewAllSensors(APIView):
[docs] def get(self, request, device_id): all_sensors = get_all_sensors() all_sensors_serializer = SensorSerializer(all_sensors, many=True) list_sensors = list(all_sensors_serializer.data) send_sensor = [i for i in list_sensors if str(i["device"]) == str(device_id)] return Response(status=status.HTTP_200_OK, data=send_sensor)