qiskit_classroom.worker
worker for convert and visualize expressions
1""" 2 worker for convert and visualize expressions 3""" 4 5# Licensed to the Apache Software Foundation (ASF) under one 6# or more contributor license agreements. See the NOTICE file 7# distributed with this work for additional information 8# regarding copyright ownership. The ASF licenses this file 9# to you under the Apache License, Version 2.0 (the 10# "License"); you may not use this file except in compliance 11# with the License. You may obtain a copy of the License at 12# 13# http://www.apache.org/licenses/LICENSE-2.0 14# 15# Unless required by applicable law or agreed to in writing, 16# software distributed under the License is distributed on an 17# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 18# KIND, either express or implied. See the License for the 19# specific language governing permissions and limitations 20# under the License. 21 22import asyncio 23import datetime 24import random 25import os 26import string 27import sys 28import matplotlib as mpl 29import matplotlib.pyplot as plt 30from .expression_enum import QuantumExpression 31from .input_model import Input, QuantumCircuitInput, MatrixInput 32 33mpl.rcParams["font.size"] = 9 34mpl.rcParams["text.usetex"] = True 35mpl.rcParams["text.latex.preamble"] = r"\usepackage{{amsmath}}" 36 37ARRAY_TO_LATEX_IMPORT = "from qiskit.visualization import array_to_latex" 38CONVERTER_IMPORT = "from qiskit_class_converter import ConversionService" 39 40 41def add_new_line(strings: list[str]) -> str: 42 """add \\n between every line 43 44 Args: 45 strings (list[str]): list of line 46 47 Returns: 48 str: joined string with \\n 49 """ 50 return "\n".join(strings) 51 52 53# pylint: disable=too-many-instance-attributes 54class ConverterWorker: 55 """worker for convert expression and visualize expression""" 56 57 def __init__( 58 self, 59 from_expression: QuantumExpression, 60 to_expression: QuantumExpression, 61 input_data: Input, 62 expression_text: str, 63 shows_result: bool, 64 ) -> None: 65 self.from_expression = from_expression 66 self.to_expression = to_expression 67 self.__injected_sourcecode_path = ConverterWorker.generate_random_file_name() 68 69 # copy text 70 self.expression_text = "" + expression_text 71 self.input_data = input_data 72 self.shows_result = shows_result 73 74 @staticmethod 75 def generate_random_file_name() -> str: # pragma: no cover 76 # this method implmented with random function 77 """return generated file name 78 79 Returns: 80 str: generated file name 81 """ 82 return "".join(random.choice(string.ascii_letters) for _ in range(10)) + ".py" 83 84 @staticmethod 85 def write_converting_code(file_path: str, code: str) -> bool: # pragma: no cover 86 """write code to file_path 87 88 Args: 89 file_path (str): target 90 code (str): contents 91 92 Returns: 93 bool: is succesful 94 """ 95 try: 96 with open(file_path, mode="w", encoding="UTF-8") as file: 97 file.write(code) 98 except FileNotFoundError: 99 return False 100 return True 101 102 def __generate_code(self): # pragma: no cover 103 expression_text = self.expression_text 104 if self.from_expression is QuantumExpression.MATRIX: 105 input_data: MatrixInput = self.input_data 106 expression_text = f"{input_data.value_name}={expression_text}" 107 ConverterWorker.write_converting_code( 108 self.__injected_sourcecode_path, 109 add_new_line( 110 [ 111 expression_text, 112 CONVERTER_IMPORT, 113 ARRAY_TO_LATEX_IMPORT, 114 self.generate_conversion_code(), 115 self.generate_visualization_code(), 116 ] 117 ), 118 ) 119 120 def generate_conversion_code(self) -> str: 121 """generate the conversion code according to the conversion method. 122 123 Returns: 124 str: generated conversion code 125 """ 126 if self.to_expression == self.from_expression: 127 return "" 128 matrix_to_qc_option: dict[str, str] = {"label": "unitary gate"} 129 default_option: dict[str, str] = {"print": "raw"} 130 131 option: dict[str, str] = {} 132 if self.to_expression is QuantumExpression.CIRCUIT: 133 option = matrix_to_qc_option 134 else: 135 option = default_option 136 first_line = ( 137 "converter = ConversionService(conversion_type=" 138 + f"'{self.from_expression.value[1]}_TO_{self.to_expression.value[1]}', " 139 + f"option={option})" 140 ) 141 next_line: str = "" 142 if self.from_expression is QuantumExpression.CIRCUIT: 143 quantum_circuit_input: QuantumCircuitInput = self.input_data 144 next_line = ( 145 "result = converter.convert" 146 + f"(input_value={quantum_circuit_input.value_name})" 147 ) 148 if self.from_expression is QuantumExpression.MATRIX: 149 matrix_input: MatrixInput = self.input_data 150 next_line = add_new_line( 151 [ 152 "from qiskit import QuantumCircuit", 153 "" 154 f"result = converter.convert(input_value={matrix_input.value_name})", 155 f"quantum_circuit = QuantumCircuit({matrix_input.num_qubits})", 156 "quantum_circuit.append(result, list(range(result.num_qubits)))", 157 "quantum_circuit.measure_all()" if matrix_input.do_measure else "", 158 ] 159 ) 160 161 return add_new_line([first_line, next_line]) 162 163 def generate_visualization_code(self) -> str: 164 """generate visualiszation code according to the conversion method 165 166 Returns: 167 str: visualization code 168 """ 169 if self.to_expression is not self.from_expression: 170 if self.to_expression is QuantumExpression.MATRIX: 171 return add_new_line( 172 [ 173 "for gate, name in zip(reversed(result['gate']), reversed(result['name'])):", 174 """\tprint(f'{gate.strip()}_' + '{' + "\\\\otimes ".join(name[1]) + '}')""", 175 "print(f\"={result['result']}_{{result}}\")" 176 if self.shows_result 177 else "", 178 ] 179 ) 180 181 if self.to_expression is QuantumExpression.CIRCUIT: 182 return add_new_line( 183 [ 184 'quantum_circuit.draw(output="mpl")' 185 + f'.savefig("{self.__injected_sourcecode_path+".png"}", ' 186 + 'bbox_inches="tight")' 187 ] 188 ) 189 190 if self.to_expression is QuantumExpression.DIRAC: 191 return add_new_line(["print(result)"]) 192 else: 193 if self.to_expression is QuantumExpression.MATRIX: 194 matrix_input: MatrixInput = self.input_data 195 return add_new_line( 196 [f"print(array_to_latex({matrix_input.value_name}, source=True))"] 197 ) 198 if self.to_expression is QuantumExpression.CIRCUIT: 199 qunatum_input: QuantumCircuitInput = self.input_data 200 return add_new_line( 201 [ 202 f'{qunatum_input.value_name}.draw(output="mpl")' 203 + f'.savefig("{self.__injected_sourcecode_path+".png"}",' 204 + 'bbox_inches="tight")' 205 ] 206 ) 207 return "" 208 209 async def run(self) -> str: 210 """inject expression convert code to user's source code and create 211 subprocess for drawing converted expresion 212 213 Returns: 214 str: path of subprocess created image 215 """ 216 print("now running") 217 print(datetime.datetime.now().time()) 218 self.__generate_code() 219 stdout, stderr = await self.run_subprocess() 220 221 if stdout: 222 print(f"output {stdout}") 223 if stderr: 224 stderr: str = stderr 225 print(f"error {stderr}") 226 if stderr.find("SyntaxError") != -1: 227 raise SyntaxError 228 if stderr.find("NameError") != -1: 229 raise NameError 230 print("end at ") 231 print(datetime.datetime.now().time()) 232 233 # remove injected source code 234 if not self.cleanup(): 235 print("error removing file") 236 237 if self.to_expression is QuantumExpression.CIRCUIT: 238 return self.__injected_sourcecode_path + ".png" 239 240 return self.draw_latex(latex=stdout) 241 242 async def run_subprocess(self) -> (str, str): 243 """run generated script's subprocess 244 245 Returns: 246 (str, str): subprocess's stdout and stderr 247 """ 248 proc = await asyncio.create_subprocess_exec( 249 sys.executable, 250 self.__injected_sourcecode_path, 251 stdout=asyncio.subprocess.PIPE, 252 stderr=asyncio.subprocess.PIPE, 253 ) 254 stdout, stderr = await proc.communicate() 255 256 await proc.wait() 257 return (stdout.decode(), stderr.decode()) 258 259 def cleanup(self) -> bool: 260 """remove generated script 261 262 Returns: 263 bool: result of removing file 264 """ 265 try: 266 os.remove(self.__injected_sourcecode_path) 267 except FileNotFoundError: 268 return False 269 return True 270 271 def draw_latex(self, latex: str) -> str: # pragma: no cover 272 """ 273 render latex to image and save as file. 274 275 Args: 276 latex (str): latex matrix code 277 278 Raises: 279 MatrixNotFound: when latex not have matrix 280 281 Returns: 282 str: image file path 283 """ 284 285 # this code avoid latex runtime error (\n ocurse error) 286 latex = latex.replace("\n", " ").strip() 287 288 fig = plt.figure() 289 fig.text(0, 0, f"${latex}$") 290 output = self.__injected_sourcecode_path + ".png" 291 fig.savefig(output, dpi=200, bbox_inches="tight") 292 plt.close() 293 return output
42def add_new_line(strings: list[str]) -> str: 43 """add \\n between every line 44 45 Args: 46 strings (list[str]): list of line 47 48 Returns: 49 str: joined string with \\n 50 """ 51 return "\n".join(strings)
add \n between every line
Args: strings (list[str]): list of line
Returns: str: joined string with \n
55class ConverterWorker: 56 """worker for convert expression and visualize expression""" 57 58 def __init__( 59 self, 60 from_expression: QuantumExpression, 61 to_expression: QuantumExpression, 62 input_data: Input, 63 expression_text: str, 64 shows_result: bool, 65 ) -> None: 66 self.from_expression = from_expression 67 self.to_expression = to_expression 68 self.__injected_sourcecode_path = ConverterWorker.generate_random_file_name() 69 70 # copy text 71 self.expression_text = "" + expression_text 72 self.input_data = input_data 73 self.shows_result = shows_result 74 75 @staticmethod 76 def generate_random_file_name() -> str: # pragma: no cover 77 # this method implmented with random function 78 """return generated file name 79 80 Returns: 81 str: generated file name 82 """ 83 return "".join(random.choice(string.ascii_letters) for _ in range(10)) + ".py" 84 85 @staticmethod 86 def write_converting_code(file_path: str, code: str) -> bool: # pragma: no cover 87 """write code to file_path 88 89 Args: 90 file_path (str): target 91 code (str): contents 92 93 Returns: 94 bool: is succesful 95 """ 96 try: 97 with open(file_path, mode="w", encoding="UTF-8") as file: 98 file.write(code) 99 except FileNotFoundError: 100 return False 101 return True 102 103 def __generate_code(self): # pragma: no cover 104 expression_text = self.expression_text 105 if self.from_expression is QuantumExpression.MATRIX: 106 input_data: MatrixInput = self.input_data 107 expression_text = f"{input_data.value_name}={expression_text}" 108 ConverterWorker.write_converting_code( 109 self.__injected_sourcecode_path, 110 add_new_line( 111 [ 112 expression_text, 113 CONVERTER_IMPORT, 114 ARRAY_TO_LATEX_IMPORT, 115 self.generate_conversion_code(), 116 self.generate_visualization_code(), 117 ] 118 ), 119 ) 120 121 def generate_conversion_code(self) -> str: 122 """generate the conversion code according to the conversion method. 123 124 Returns: 125 str: generated conversion code 126 """ 127 if self.to_expression == self.from_expression: 128 return "" 129 matrix_to_qc_option: dict[str, str] = {"label": "unitary gate"} 130 default_option: dict[str, str] = {"print": "raw"} 131 132 option: dict[str, str] = {} 133 if self.to_expression is QuantumExpression.CIRCUIT: 134 option = matrix_to_qc_option 135 else: 136 option = default_option 137 first_line = ( 138 "converter = ConversionService(conversion_type=" 139 + f"'{self.from_expression.value[1]}_TO_{self.to_expression.value[1]}', " 140 + f"option={option})" 141 ) 142 next_line: str = "" 143 if self.from_expression is QuantumExpression.CIRCUIT: 144 quantum_circuit_input: QuantumCircuitInput = self.input_data 145 next_line = ( 146 "result = converter.convert" 147 + f"(input_value={quantum_circuit_input.value_name})" 148 ) 149 if self.from_expression is QuantumExpression.MATRIX: 150 matrix_input: MatrixInput = self.input_data 151 next_line = add_new_line( 152 [ 153 "from qiskit import QuantumCircuit", 154 "" 155 f"result = converter.convert(input_value={matrix_input.value_name})", 156 f"quantum_circuit = QuantumCircuit({matrix_input.num_qubits})", 157 "quantum_circuit.append(result, list(range(result.num_qubits)))", 158 "quantum_circuit.measure_all()" if matrix_input.do_measure else "", 159 ] 160 ) 161 162 return add_new_line([first_line, next_line]) 163 164 def generate_visualization_code(self) -> str: 165 """generate visualiszation code according to the conversion method 166 167 Returns: 168 str: visualization code 169 """ 170 if self.to_expression is not self.from_expression: 171 if self.to_expression is QuantumExpression.MATRIX: 172 return add_new_line( 173 [ 174 "for gate, name in zip(reversed(result['gate']), reversed(result['name'])):", 175 """\tprint(f'{gate.strip()}_' + '{' + "\\\\otimes ".join(name[1]) + '}')""", 176 "print(f\"={result['result']}_{{result}}\")" 177 if self.shows_result 178 else "", 179 ] 180 ) 181 182 if self.to_expression is QuantumExpression.CIRCUIT: 183 return add_new_line( 184 [ 185 'quantum_circuit.draw(output="mpl")' 186 + f'.savefig("{self.__injected_sourcecode_path+".png"}", ' 187 + 'bbox_inches="tight")' 188 ] 189 ) 190 191 if self.to_expression is QuantumExpression.DIRAC: 192 return add_new_line(["print(result)"]) 193 else: 194 if self.to_expression is QuantumExpression.MATRIX: 195 matrix_input: MatrixInput = self.input_data 196 return add_new_line( 197 [f"print(array_to_latex({matrix_input.value_name}, source=True))"] 198 ) 199 if self.to_expression is QuantumExpression.CIRCUIT: 200 qunatum_input: QuantumCircuitInput = self.input_data 201 return add_new_line( 202 [ 203 f'{qunatum_input.value_name}.draw(output="mpl")' 204 + f'.savefig("{self.__injected_sourcecode_path+".png"}",' 205 + 'bbox_inches="tight")' 206 ] 207 ) 208 return "" 209 210 async def run(self) -> str: 211 """inject expression convert code to user's source code and create 212 subprocess for drawing converted expresion 213 214 Returns: 215 str: path of subprocess created image 216 """ 217 print("now running") 218 print(datetime.datetime.now().time()) 219 self.__generate_code() 220 stdout, stderr = await self.run_subprocess() 221 222 if stdout: 223 print(f"output {stdout}") 224 if stderr: 225 stderr: str = stderr 226 print(f"error {stderr}") 227 if stderr.find("SyntaxError") != -1: 228 raise SyntaxError 229 if stderr.find("NameError") != -1: 230 raise NameError 231 print("end at ") 232 print(datetime.datetime.now().time()) 233 234 # remove injected source code 235 if not self.cleanup(): 236 print("error removing file") 237 238 if self.to_expression is QuantumExpression.CIRCUIT: 239 return self.__injected_sourcecode_path + ".png" 240 241 return self.draw_latex(latex=stdout) 242 243 async def run_subprocess(self) -> (str, str): 244 """run generated script's subprocess 245 246 Returns: 247 (str, str): subprocess's stdout and stderr 248 """ 249 proc = await asyncio.create_subprocess_exec( 250 sys.executable, 251 self.__injected_sourcecode_path, 252 stdout=asyncio.subprocess.PIPE, 253 stderr=asyncio.subprocess.PIPE, 254 ) 255 stdout, stderr = await proc.communicate() 256 257 await proc.wait() 258 return (stdout.decode(), stderr.decode()) 259 260 def cleanup(self) -> bool: 261 """remove generated script 262 263 Returns: 264 bool: result of removing file 265 """ 266 try: 267 os.remove(self.__injected_sourcecode_path) 268 except FileNotFoundError: 269 return False 270 return True 271 272 def draw_latex(self, latex: str) -> str: # pragma: no cover 273 """ 274 render latex to image and save as file. 275 276 Args: 277 latex (str): latex matrix code 278 279 Raises: 280 MatrixNotFound: when latex not have matrix 281 282 Returns: 283 str: image file path 284 """ 285 286 # this code avoid latex runtime error (\n ocurse error) 287 latex = latex.replace("\n", " ").strip() 288 289 fig = plt.figure() 290 fig.text(0, 0, f"${latex}$") 291 output = self.__injected_sourcecode_path + ".png" 292 fig.savefig(output, dpi=200, bbox_inches="tight") 293 plt.close() 294 return output
worker for convert expression and visualize expression
58 def __init__( 59 self, 60 from_expression: QuantumExpression, 61 to_expression: QuantumExpression, 62 input_data: Input, 63 expression_text: str, 64 shows_result: bool, 65 ) -> None: 66 self.from_expression = from_expression 67 self.to_expression = to_expression 68 self.__injected_sourcecode_path = ConverterWorker.generate_random_file_name() 69 70 # copy text 71 self.expression_text = "" + expression_text 72 self.input_data = input_data 73 self.shows_result = shows_result
75 @staticmethod 76 def generate_random_file_name() -> str: # pragma: no cover 77 # this method implmented with random function 78 """return generated file name 79 80 Returns: 81 str: generated file name 82 """ 83 return "".join(random.choice(string.ascii_letters) for _ in range(10)) + ".py"
return generated file name
Returns: str: generated file name
85 @staticmethod 86 def write_converting_code(file_path: str, code: str) -> bool: # pragma: no cover 87 """write code to file_path 88 89 Args: 90 file_path (str): target 91 code (str): contents 92 93 Returns: 94 bool: is succesful 95 """ 96 try: 97 with open(file_path, mode="w", encoding="UTF-8") as file: 98 file.write(code) 99 except FileNotFoundError: 100 return False 101 return True
write code to file_path
Args: file_path (str): target code (str): contents
Returns: bool: is succesful
121 def generate_conversion_code(self) -> str: 122 """generate the conversion code according to the conversion method. 123 124 Returns: 125 str: generated conversion code 126 """ 127 if self.to_expression == self.from_expression: 128 return "" 129 matrix_to_qc_option: dict[str, str] = {"label": "unitary gate"} 130 default_option: dict[str, str] = {"print": "raw"} 131 132 option: dict[str, str] = {} 133 if self.to_expression is QuantumExpression.CIRCUIT: 134 option = matrix_to_qc_option 135 else: 136 option = default_option 137 first_line = ( 138 "converter = ConversionService(conversion_type=" 139 + f"'{self.from_expression.value[1]}_TO_{self.to_expression.value[1]}', " 140 + f"option={option})" 141 ) 142 next_line: str = "" 143 if self.from_expression is QuantumExpression.CIRCUIT: 144 quantum_circuit_input: QuantumCircuitInput = self.input_data 145 next_line = ( 146 "result = converter.convert" 147 + f"(input_value={quantum_circuit_input.value_name})" 148 ) 149 if self.from_expression is QuantumExpression.MATRIX: 150 matrix_input: MatrixInput = self.input_data 151 next_line = add_new_line( 152 [ 153 "from qiskit import QuantumCircuit", 154 "" 155 f"result = converter.convert(input_value={matrix_input.value_name})", 156 f"quantum_circuit = QuantumCircuit({matrix_input.num_qubits})", 157 "quantum_circuit.append(result, list(range(result.num_qubits)))", 158 "quantum_circuit.measure_all()" if matrix_input.do_measure else "", 159 ] 160 ) 161 162 return add_new_line([first_line, next_line])
generate the conversion code according to the conversion method.
Returns: str: generated conversion code
164 def generate_visualization_code(self) -> str: 165 """generate visualiszation code according to the conversion method 166 167 Returns: 168 str: visualization code 169 """ 170 if self.to_expression is not self.from_expression: 171 if self.to_expression is QuantumExpression.MATRIX: 172 return add_new_line( 173 [ 174 "for gate, name in zip(reversed(result['gate']), reversed(result['name'])):", 175 """\tprint(f'{gate.strip()}_' + '{' + "\\\\otimes ".join(name[1]) + '}')""", 176 "print(f\"={result['result']}_{{result}}\")" 177 if self.shows_result 178 else "", 179 ] 180 ) 181 182 if self.to_expression is QuantumExpression.CIRCUIT: 183 return add_new_line( 184 [ 185 'quantum_circuit.draw(output="mpl")' 186 + f'.savefig("{self.__injected_sourcecode_path+".png"}", ' 187 + 'bbox_inches="tight")' 188 ] 189 ) 190 191 if self.to_expression is QuantumExpression.DIRAC: 192 return add_new_line(["print(result)"]) 193 else: 194 if self.to_expression is QuantumExpression.MATRIX: 195 matrix_input: MatrixInput = self.input_data 196 return add_new_line( 197 [f"print(array_to_latex({matrix_input.value_name}, source=True))"] 198 ) 199 if self.to_expression is QuantumExpression.CIRCUIT: 200 qunatum_input: QuantumCircuitInput = self.input_data 201 return add_new_line( 202 [ 203 f'{qunatum_input.value_name}.draw(output="mpl")' 204 + f'.savefig("{self.__injected_sourcecode_path+".png"}",' 205 + 'bbox_inches="tight")' 206 ] 207 ) 208 return ""
generate visualiszation code according to the conversion method
Returns: str: visualization code
210 async def run(self) -> str: 211 """inject expression convert code to user's source code and create 212 subprocess for drawing converted expresion 213 214 Returns: 215 str: path of subprocess created image 216 """ 217 print("now running") 218 print(datetime.datetime.now().time()) 219 self.__generate_code() 220 stdout, stderr = await self.run_subprocess() 221 222 if stdout: 223 print(f"output {stdout}") 224 if stderr: 225 stderr: str = stderr 226 print(f"error {stderr}") 227 if stderr.find("SyntaxError") != -1: 228 raise SyntaxError 229 if stderr.find("NameError") != -1: 230 raise NameError 231 print("end at ") 232 print(datetime.datetime.now().time()) 233 234 # remove injected source code 235 if not self.cleanup(): 236 print("error removing file") 237 238 if self.to_expression is QuantumExpression.CIRCUIT: 239 return self.__injected_sourcecode_path + ".png" 240 241 return self.draw_latex(latex=stdout)
inject expression convert code to user's source code and create subprocess for drawing converted expresion
Returns: str: path of subprocess created image
243 async def run_subprocess(self) -> (str, str): 244 """run generated script's subprocess 245 246 Returns: 247 (str, str): subprocess's stdout and stderr 248 """ 249 proc = await asyncio.create_subprocess_exec( 250 sys.executable, 251 self.__injected_sourcecode_path, 252 stdout=asyncio.subprocess.PIPE, 253 stderr=asyncio.subprocess.PIPE, 254 ) 255 stdout, stderr = await proc.communicate() 256 257 await proc.wait() 258 return (stdout.decode(), stderr.decode())
run generated script's subprocess
Returns: (str, str): subprocess's stdout and stderr
260 def cleanup(self) -> bool: 261 """remove generated script 262 263 Returns: 264 bool: result of removing file 265 """ 266 try: 267 os.remove(self.__injected_sourcecode_path) 268 except FileNotFoundError: 269 return False 270 return True
remove generated script
Returns: bool: result of removing file
272 def draw_latex(self, latex: str) -> str: # pragma: no cover 273 """ 274 render latex to image and save as file. 275 276 Args: 277 latex (str): latex matrix code 278 279 Raises: 280 MatrixNotFound: when latex not have matrix 281 282 Returns: 283 str: image file path 284 """ 285 286 # this code avoid latex runtime error (\n ocurse error) 287 latex = latex.replace("\n", " ").strip() 288 289 fig = plt.figure() 290 fig.text(0, 0, f"${latex}$") 291 output = self.__injected_sourcecode_path + ".png" 292 fig.savefig(output, dpi=200, bbox_inches="tight") 293 plt.close() 294 return output
render latex to image and save as file.
Args: latex (str): latex matrix code
Raises: MatrixNotFound: when latex not have matrix
Returns: str: image file path