Coverage for src / app / secure_system.py: 100%

58 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-23 14:54 +0000

1""" 

2Secure System Module. 

3 

4This module demonstrates a secure implementation of a user management service 

5following strict typing and security guidelines. 

6""" 

7 

8from abc import ABC, abstractmethod 

9from dataclasses import dataclass 

10from uuid import UUID, uuid4 

11 

12from pydantic import EmailStr, Field, SecretStr 

13from pydantic.networks import IPvAnyAddress 

14 

15from taipanstack.core.result import Err, Ok, Result 

16from taipanstack.security import SecureBaseModel, hash_password 

17from taipanstack.utils.logging import get_logger 

18 

19# Configure logger 

20logger = get_logger(__name__) 

21 

22 

23class UserNotFoundError(Exception): 

24 """Exception raised when a user is not found.""" 

25 

26 def __init__(self, user_id: UUID) -> None: 

27 """Initialize the exception with the user ID.""" 

28 self.user_id = user_id 

29 super().__init__(f"User with ID {user_id} not found.") 

30 

31 

32class UserAlreadyExistsError(Exception): 

33 """Exception raised when a user already exists.""" 

34 

35 def __init__(self, message: str) -> None: 

36 """Initialize the exception with a message.""" 

37 self.message = message 

38 super().__init__(message) 

39 

40 

41@dataclass(frozen=True) 

42class UserCreationError(Exception): 

43 """Exception class for user creation errors.""" 

44 

45 message: str = "Failed to create user" 

46 

47 

48class UserCreate(SecureBaseModel): 

49 """ 

50 Model for creating a new user. 

51 

52 Attributes: 

53 username: The username of the user. 

54 email: The email address of the user. 

55 password: The password of the user (will be treated as a secret). 

56 ip_address: The IP address from which the user is registering. 

57 

58 """ 

59 

60 username: str = Field(..., min_length=3, max_length=50, pattern=r"^[a-zA-Z0-9_]+$") 

61 email: EmailStr 

62 password: SecretStr 

63 ip_address: IPvAnyAddress | None = None 

64 

65 

66class User(SecureBaseModel): 

67 """ 

68 Model representing a registered user. 

69 

70 Attributes: 

71 id: Unique identifier for the user. 

72 username: The username of the user. 

73 email: The email address of the user. 

74 is_active: Whether the user account is active. 

75 

76 """ 

77 

78 id: UUID 

79 username: str 

80 email: EmailStr 

81 password_hash: str 

82 

83 

84class UserRepository(ABC): 

85 """Abstract base class for user data access.""" 

86 

87 @abstractmethod 

88 def save(self, user: User) -> None: 

89 """ 

90 Save a user to the repository. 

91 

92 Args: 

93 user: The user to save. 

94 

95 """ 

96 

97 @abstractmethod 

98 def get_by_id(self, user_id: UUID) -> User | None: 

99 """ 

100 Retrieve a user by their ID. 

101 

102 Args: 

103 user_id: The UUID of the user. 

104 

105 Returns: 

106 The User object if found, otherwise None. 

107 

108 """ 

109 

110 

111class InMemoryUserRepository(UserRepository): 

112 """In-memory implementation of UserRepository.""" 

113 

114 def __init__(self) -> None: 

115 """Initialize the in-memory repository.""" 

116 self._storage: dict[UUID, User] = {} 

117 

118 def save(self, user: User) -> None: 

119 """ 

120 Save a user to the in-memory storage. 

121 

122 Args: 

123 user: The user to save. 

124 

125 """ 

126 self._storage[user.id] = user 

127 

128 def get_by_id(self, user_id: UUID) -> User | None: 

129 """ 

130 Retrieve a user from the in-memory storage. 

131 

132 Args: 

133 user_id: The UUID of the user. 

134 

135 Returns: 

136 The User object if found, otherwise None. 

137 

138 """ 

139 return self._storage.get(user_id) 

140 

141 

142class UserService: 

143 """Service for managing users securely.""" 

144 

145 def __init__(self, user_repository: UserRepository) -> None: 

146 """ 

147 Initialize the UserService with a repository. 

148 

149 Args: 

150 user_repository: The repository to use for data access. 

151 

152 """ 

153 self._user_repository = user_repository 

154 

155 def create_user(self, user_create: UserCreate) -> Result[User, UserCreationError]: 

156 """ 

157 Create a new user. 

158 

159 Args: 

160 user_create: The user creation data. 

161 

162 Returns: 

163 Ok(User) on success, Err(UserCreationError) on failure. 

164 

165 """ 

166 # Hash the password securely using the security module 

167 pwd_hash = hash_password(user_create.password) 

168 

169 user_id = uuid4() 

170 user = User( 

171 id=user_id, 

172 username=user_create.username, 

173 email=user_create.email, 

174 password_hash=pwd_hash, 

175 ) 

176 try: 

177 self._user_repository.save(user) 

178 logger.info("User created successfully", user_id=user.id) 

179 return Ok(user) 

180 except UserAlreadyExistsError as e: 

181 logger.exception("Failed to create user", user_id=user.id) 

182 return Err(UserCreationError(message=str(e))) 

183 

184 def get_user(self, user_id: UUID) -> Result[User, UserNotFoundError]: 

185 """ 

186 Retrieve a user by ID using Result pattern. 

187 

188 Args: 

189 user_id: The UUID of the user. 

190 

191 Returns: 

192 Ok(User) if found, Err(UserNotFoundError) if not found. 

193 

194 Example: 

195 >>> match service.get_user(some_id): 

196 ... case Ok(user): 

197 ... print(f"Found: {user.username}") 

198 ... case Err(error): 

199 ... print(f"Not found: {error.user_id}") 

200 

201 """ 

202 user = self._user_repository.get_by_id(user_id) 

203 if user is None: 

204 logger.warning("User lookup failed", user_id=user_id) 

205 return Err(UserNotFoundError(user_id)) 

206 return Ok(user)