Pythonでもオレオレ文字コードを実装したい (3)
実装の流れ
自作エンコードをPythonで扱うための実装の手順は以下の感じっぽいです。
7.2. codecs — codec レジストリと基底クラス — Python 3.3.6 ドキュメント
- エンコーダ・デコーダの実装
- エンコーダ・デコーダとcodecsモジュールに含まれているStreamWriteとStreamReaderの基底クラスを用いていい感じにStreamWriterとStreamReaderの実装
- 同じくエンコーダ・デコーダとcodecsモジュールに含まれているIncrementalEncoderとIncrementalDecoderの基底クラスを用いていい感じにIncrementalEncoderとIncrementalDecoderの実装(任意)
- 1〜3で作った物を使用してCodecInfoオブジェクトを構築
- 自作エンコーディング名からcodec.CodecInfoオブジェクトを検索して返す検索関数を実装
- 検索関数をcodecs.registerに登録
- たのしい
今回定義した文字符号化方式はステートレスな上にすべてがシングルバイトですから単純にバイトとUnicode文字をマップしてあげれば比較的簡単にエンコーダとデコーダを実装できそうです。
StreamReaderとSteamWriterはモジュールに含まれている基底クラスを自作したエンコーダ・デコーダで拡張してあげれば比較的簡単に実装できます。
そういえば、今回作成した符号化文字集合と文字符号化方式の名前を決めていませんでした。 文字集合を『Jyuchラテンアルファベット・かな文字集合』、符号化方式を『Jyuch-Kana』という名前にしましょう。 符号化文字集合の名前はどうでもいいのですが、符号化方式の方はPythonコーデックレジストリに登録するときに名前で登録しなくてはならないので、何かしらは決めておかないといけません。
エンコーダとデコーダ
兎にも角にもまずはエンコーダとデコーダを実装しなければ話になりません。
エンコーダとデコーダはcodecsモジュールのCodec基底クラスのencode()
とdecode()
をオーバーライド(するか、同じシグネチャを持つ関数を定義)して実装します。
それぞれ詳しく見ていきましょう。ここがメインですから。
エンコードとデコード
今まで当たり前のようにエンコードとデコードという言葉を使っていましたが、意味を再確認してみましょう。
Pythonでは文字をUnicodeとして表します。 しかし、ファイルでは文字をバイト列として表現します。 つまり、Pythonでテキストファイルを扱うためにはUnicodeとして表現された文字とバイト列として表現された文字をそれぞれ変換するための何かが必要になります。
このUnicodeからバイト列へ変換を行うのがエンコーダ、逆にバイト列からUnicodeへ変換を行うのがデコーダです。
encoder(Unicode) -> バイト列 decoder(バイト列) -> Unicode
また、当たり前ですがバイト列をUnicodeにデコードした後、そのUnicodeをバイト列にエンコードしたら元のバイト列にならなくてはなりません。
decoder(encoder(Unicode)) -> 元のUnicode encoder(decoder(バイト列)) -> 元のバイト列
エンコーダの実装
エンコーダはcodecs.Codecインターフェースの実装として定義します。
エンコーダは以下のシグネチャを持つ関数として定義されます。
encode(input[, errors])
オブジェクトinput
をエンコードし、(出力オブジェクト、消費したinputオブジェクトの長さ)
を保持するタプルを返します。
errors
はエンコード中にエラーが発生した場合のエンコーダの挙動を指定するために用いる文字列オブジェクトです。
標準がstrict
ですが、他にも標準ライブラリには幾つかの文字列が定義されています。
今回は正直遊びで作ってる実用性皆無の符号化方式に真面目にエラー処理を行う気がない時間の都合上strict
のみの実装にし、他のを渡されても無視する事にします。
ということで、エンコーダをポンと実装します。
def encode(input, errors='strict'): buffer = [] for i, it in zip(count(0), input): try: buffer.append(convert_unicode_to_hex(it)) except ValueError: raise UnicodeError( "'jyuch-kana' codec can't encode characters in position {0}".format(i) ) return bytes(buffer), len(input) def convert_unicode_to_hex(unicode): if unicode == chr(0x00): return 0x00 elif unicode == chr(0x0A): return 0x01 elif unicode == chr(0x0D): return 0x02 elif unicode == chr(0x20): return 0x10 elif chr(0x41) <= unicode <= chr(0x5A): return ord(unicode) - 0x30 elif chr(0x61) <= unicode <= chr(0x7A): return ord(unicode) - 0x36 elif unicode == chr(0x3000): return 0x50 elif chr(0x3041) <= unicode <= chr(0x3093): return ord(unicode) - 0x2FF0 else: raise ValueError()
convert_unicode_to_hex
関数でUnicode一文字を等価なバイナリ値に変換し、convert_unicode_to_hex
を用いてencode
メソッドで実際に文字列からバイナリ列に変換を行います。
また、エンコードできない文字を受け取った場合はUnicodeError
をスローします。
本当はUnicodeEncodeError
をスローして詳細な情報を上位のメソッドに通知したほうがいいと思うのですが、UnicodeEncodeError
をうまく生成出来ずに断念しました。
ググってもキャッチする方しか出てこず、スローする方の情報が見当たりません。
詳しいことを知っている方がいましたら教えてください。
また、お気づきの方もいると思いますがcodecs.Codec
の実際のencode
メソッドはencode(self, input, errors='strict')
ですが、この関数にはself
がありません。が、大丈夫です。後でいい感じになんとかします。
デコーダ
同様にデコーダも実装します。
decode(input[, errors])
エンコーダと同様にオブジェクトinput
をデコードし、(出力オブジェクト、消費したinputオブジェクトの長さ)
を保持するタプルを返します。
def decode(input, errors='strict'): buffer = [] for i, it in zip(count(0), input): try: buffer.append(convert_hex_to_unicode(it)) except ValueError: raise UnicodeError( "'jyuch-kana' codec can't decode characters in position {0}".format(i) ) return ''.join(buffer), len(input) def convert_hex_to_unicode(hex): if hex == 0x00: return chr(0x00) elif hex == 0x01: return chr(0x0A) elif hex == 0x02: return chr(0x0D) elif hex == 0x10: return chr(0x20) elif 0x11 <= hex <= 0x2A: return chr(hex + 0x30) elif 0x2B <= hex <= 0x44: return chr(hex + 0x36) elif hex == 0x50: return chr(0x3000) elif 0x51 <= hex <= 0xA3: return chr(hex + 0x2FF0) else: raise ValueError()
プログラムの構造も基本的にはエンコーダと同じで、単一のバイナリを等価なUnicodeに変換する関数convert_hex_to_unicode
とそれを組み合わせてバイナリ列を等価なUnicode文字列に変換するdecode
関数として定義します。
StreamReaderとStreamWriterの実装
先ほど実装したエンコーダ・デコーダを用いてStreamReader
とStreamWriter
のサブクラスを実装します。
これは、実際のコード見てもらったほうが簡単だと思います。
class StreamWriter(codecs.StreamWriter): encode = lambda self, input, errors: encode(input, errors) class StreamReader(codecs.StreamReader): decode = lambda self, input, errors: decode(input, errors)
終わりです。
それぞれのencode
とdecode
を自作したエンコーダとデコーダで置き換えて終了です。
残りの処理についてはそれぞれの基底クラスがいい感じにやってくれます。
CodecInfoオブジェクトの生成
これまで実装したエンコーダ・デコーダ・StreamReader・StreamWriterを用いてJyuch-KanaのCodecInfo
オブジェクトを構築します。
標準ライブラリではCodecInfoオブジェクトを構築して返す関数を定義するという形になっていたのでそれに従い実装します。
def getregentry(): return codecs.CodecInfo( name='jyuch-kana', encode=lambda self, input, errors: encode(input, errors), decode=lambda self, input, errors: decode(input, errors), streamreader=StreamReader, streamwriter=StreamWriter, )
Pythonではコーデック名はすべて小文字で統一するみたいです。
検索関数の登録
ここまでくれば後はもうちょいです。
自作したコーデック名からCodecInfoオブジェクトを返す関数を実装し、それをcodecs.register
に登録すれば完了です。
def search_function(codec_name): if codec_name == 'jyuch-kana': return getregentry() else: return None def registry(): codecs.register(search_function) registry()
ここでのキモは、検索関数が自分の知らないコーデック名を渡された時は下手に変なのを返すのではなく素直にNone
を返してあげることです。
そうドキュメントに書いてありました。はい。
コーデックの全体
ポイントポイントでプログラムを抜き出したので、正直わかりにくいと思います。 以下が今回実装したコーデックの全体です。
""" 'Jyuch-Kana' Codec Written by jyuch (http://jyuch.hatenablog.com/) """ import codecs from itertools import count def encode(input, errors='strict'): buffer = [] for i, it in zip(count(0), input): try: buffer.append(convert_unicode_to_hex(it)) except ValueError: raise UnicodeError( "'jyuch-kana' codec can't encode characters in position {0}".format(i) ) return bytes(buffer), len(input) def convert_unicode_to_hex(unicode): if unicode == chr(0x00): return 0x00 elif unicode == chr(0x0A): return 0x01 elif unicode == chr(0x0D): return 0x02 elif unicode == chr(0x20): return 0x10 elif chr(0x41) <= unicode <= chr(0x5A): return ord(unicode) - 0x30 elif chr(0x61) <= unicode <= chr(0x7A): return ord(unicode) - 0x36 elif unicode == chr(0x3000): return 0x50 elif chr(0x3041) <= unicode <= chr(0x3093): return ord(unicode) - 0x2FF0 else: raise ValueError() def decode(input, errors='strict'): buffer = [] for i, it in zip(count(0), input): try: buffer.append(convert_hex_to_unicode(it)) except ValueError: raise UnicodeError( "'jyuch-kana' codec can't decode characters in position {0}".format(i) ) return ''.join(buffer), len(input) def convert_hex_to_unicode(hex): if hex == 0x00: return chr(0x00) elif hex == 0x01: return chr(0x0A) elif hex == 0x02: return chr(0x0D) elif hex == 0x10: return chr(0x20) elif 0x11 <= hex <= 0x2A: return chr(hex + 0x30) elif 0x2B <= hex <= 0x44: return chr(hex + 0x36) elif hex == 0x50: return chr(0x3000) elif 0x51 <= hex <= 0xA3: return chr(hex + 0x2FF0) else: raise ValueError() class StreamWriter(codecs.StreamWriter): encode = lambda self, input, errors: encode(input, errors) class StreamReader(codecs.StreamReader): decode = lambda self, input, errors: decode(input, errors) def getregentry(): return codecs.CodecInfo( name='jyuch-kana', encode=lambda self, input, errors: encode(input, errors), decode=lambda self, input, errors: decode(input, errors), streamreader=StreamReader, streamwriter=StreamWriter, ) def search_function(codec_name): if codec_name == 'jyuch-kana': return getregentry() else: return None def registry(): codecs.register(search_function) registry()
前回クッソ長い符号化文字集合表をダイレクトに載せといてあれですけど、このエントリも結構な長さになってしまったので実際の使い方については次の記事に書こうと思います。